diff options
Diffstat (limited to 'src/rocksdb/db/db_sst_test.cc')
-rw-r--r-- | src/rocksdb/db/db_sst_test.cc | 1097 |
1 files changed, 1097 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_sst_test.cc b/src/rocksdb/db/db_sst_test.cc new file mode 100644 index 00000000..dcd5847e --- /dev/null +++ b/src/rocksdb/db/db_sst_test.cc @@ -0,0 +1,1097 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_test_util.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/sst_file_manager.h" +#include "util/sst_file_manager_impl.h" + +namespace rocksdb { + +class DBSSTTest : public DBTestBase { + public: + DBSSTTest() : DBTestBase("/db_sst_test") {} +}; + +#ifndef ROCKSDB_LITE +// A class which remembers the name of each flushed file. +class FlushedFileCollector : public EventListener { + public: + FlushedFileCollector() {} + ~FlushedFileCollector() override {} + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + std::lock_guard<std::mutex> lock(mutex_); + flushed_files_.push_back(info.file_path); + } + + std::vector<std::string> GetFlushedFiles() { + std::lock_guard<std::mutex> lock(mutex_); + std::vector<std::string> result; + for (auto fname : flushed_files_) { + result.push_back(fname); + } + return result; + } + void ClearFlushedFiles() { + std::lock_guard<std::mutex> lock(mutex_); + flushed_files_.clear(); + } + + private: + std::vector<std::string> flushed_files_; + std::mutex mutex_; +}; +#endif // ROCKSDB_LITE + +TEST_F(DBSSTTest, DontDeletePendingOutputs) { + Options options; + options.env = env_; + options.create_if_missing = true; + DestroyAndReopen(options); + + // Every time we write to a table file, call FOF/POF with full DB scan. This + // will make sure our pending_outputs_ protection work correctly + std::function<void()> purge_obsolete_files_function = [&]() { + JobContext job_context(0); + dbfull()->TEST_LockMutex(); + dbfull()->FindObsoleteFiles(&job_context, true /*force*/); + dbfull()->TEST_UnlockMutex(); + dbfull()->PurgeObsoleteFiles(job_context); + job_context.Clean(); + }; + + env_->table_write_callback_ = &purge_obsolete_files_function; + + for (int i = 0; i < 2; ++i) { + ASSERT_OK(Put("a", "begin")); + ASSERT_OK(Put("z", "end")); + ASSERT_OK(Flush()); + } + + // If pending output guard does not work correctly, PurgeObsoleteFiles() will + // delete the file that Compaction is trying to create, causing this: error + // db/db_test.cc:975: IO error: + // /tmp/rocksdbtest-1552237650/db_test/000009.sst: No such file or directory + Compact("a", "b"); +} + +// 1 Create some SST files by inserting K-V pairs into DB +// 2 Close DB and change suffix from ".sst" to ".ldb" for every other SST file +// 3 Open DB and check if all key can be read +TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) { + Options options = CurrentOptions(); + options.write_buffer_size = 110 << 10; // 110KB + options.num_levels = 4; + DestroyAndReopen(options); + + Random rnd(301); + int key_id = 0; + for (int i = 0; i < 10; ++i) { + GenerateNewFile(&rnd, &key_id, false); + } + Flush(); + Close(); + int const num_files = GetSstFileCount(dbname_); + ASSERT_GT(num_files, 0); + + std::vector<std::string> filenames; + GetSstFiles(env_, dbname_, &filenames); + int num_ldb_files = 0; + for (size_t i = 0; i < filenames.size(); ++i) { + if (i & 1) { + continue; + } + std::string const rdb_name = dbname_ + "/" + filenames[i]; + std::string const ldb_name = Rocks2LevelTableFileName(rdb_name); + ASSERT_TRUE(env_->RenameFile(rdb_name, ldb_name).ok()); + ++num_ldb_files; + } + ASSERT_GT(num_ldb_files, 0); + ASSERT_EQ(num_files, GetSstFileCount(dbname_)); + + Reopen(options); + for (int k = 0; k < key_id; ++k) { + ASSERT_NE("NOT_FOUND", Get(Key(k))); + } + Destroy(options); +} + +#ifndef ROCKSDB_LITE +TEST_F(DBSSTTest, DontDeleteMovedFile) { + // This test triggers move compaction and verifies that the file is not + // deleted when it's part of move compaction + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.max_bytes_for_level_base = 1024 * 1024; // 1 MB + options.level0_file_num_compaction_trigger = + 2; // trigger compaction when we have 2 files + DestroyAndReopen(options); + + Random rnd(301); + // Create two 1MB sst files + for (int i = 0; i < 2; ++i) { + // Create 1MB sst file + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024))); + } + ASSERT_OK(Flush()); + } + // this should execute both L0->L1 and L1->(move)->L2 compactions + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,0,1", FilesPerLevel(0)); + + // If the moved file is actually deleted (the move-safeguard in + // ~Version::Version() is not there), we get this failure: + // Corruption: Can't access /000009.sst + Reopen(options); +} + +// This reproduces a bug where we don't delete a file because when it was +// supposed to be deleted, it was blocked by pending_outputs +// Consider: +// 1. current file_number is 13 +// 2. compaction (1) starts, blocks deletion of all files starting with 13 +// (pending outputs) +// 3. file 13 is created by compaction (2) +// 4. file 13 is consumed by compaction (3) and file 15 was created. Since file +// 13 has no references, it is put into VersionSet::obsolete_files_ +// 5. FindObsoleteFiles() gets file 13 from VersionSet::obsolete_files_. File 13 +// is deleted from obsolete_files_ set. +// 6. PurgeObsoleteFiles() tries to delete file 13, but this file is blocked by +// pending outputs since compaction (1) is still running. It is not deleted and +// it is not present in obsolete_files_ anymore. Therefore, we never delete it. +TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 2 * 1024 * 1024; // 2 MB + options.max_bytes_for_level_base = 1024 * 1024; // 1 MB + options.level0_file_num_compaction_trigger = + 2; // trigger compaction when we have 2 files + options.max_background_flushes = 2; + options.max_background_compactions = 2; + + OnFileDeletionListener* listener = new OnFileDeletionListener(); + options.listeners.emplace_back(listener); + + Reopen(options); + + Random rnd(301); + // Create two 1MB sst files + for (int i = 0; i < 2; ++i) { + // Create 1MB sst file + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024))); + } + ASSERT_OK(Flush()); + } + // this should execute both L0->L1 and L1->(move)->L2 compactions + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,0,1", FilesPerLevel(0)); + + test::SleepingBackgroundTask blocking_thread; + port::Mutex mutex_; + bool already_blocked(false); + + // block the flush + std::function<void()> block_first_time = [&]() { + bool blocking = false; + { + MutexLock l(&mutex_); + if (!already_blocked) { + blocking = true; + already_blocked = true; + } + } + if (blocking) { + blocking_thread.DoSleep(); + } + }; + env_->table_write_callback_ = &block_first_time; + // Insert 2.5MB data, which should trigger a flush because we exceed + // write_buffer_size. The flush will be blocked with block_first_time + // pending_file is protecting all the files created after + for (int j = 0; j < 256; ++j) { + ASSERT_OK(Put(Key(j), RandomString(&rnd, 10 * 1024))); + } + blocking_thread.WaitUntilSleeping(); + + ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr)); + + ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); + std::vector<LiveFileMetaData> metadata; + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(metadata.size(), 1U); + auto file_on_L2 = metadata[0].name; + listener->SetExpectedFileName(dbname_ + file_on_L2); + + ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, + true /* disallow trivial move */)); + ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); + + // finish the flush! + blocking_thread.WakeUp(); + blocking_thread.WaitUntilDone(); + dbfull()->TEST_WaitForFlushMemTable(); + // File just flushed is too big for L0 and L1 so gets moved to L2. + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,0,1,0,1", FilesPerLevel(0)); + + metadata.clear(); + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(metadata.size(), 2U); + + // This file should have been deleted during last compaction + ASSERT_EQ(Status::NotFound(), env_->FileExists(dbname_ + file_on_L2)); + listener->VerifyMatchedCount(1); +} + +TEST_F(DBSSTTest, DBWithSstFileManager) { + std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get()); + + int files_added = 0; + int files_deleted = 0; + int files_moved = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* /*arg*/) { files_added++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* /*arg*/) { files_deleted++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 25; i++) { + GenerateNewRandomFile(&rnd); + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + // Verify that we are tracking all sst files in dbname_ + ASSERT_EQ(sfm->GetTrackedFiles(), GetAllSSTFiles()); + } + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + auto files_in_db = GetAllSSTFiles(); + // Verify that we are tracking all sst files in dbname_ + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + // Verify the total files size + uint64_t total_files_size = 0; + for (auto& file_to_size : files_in_db) { + total_files_size += file_to_size.second; + } + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + // We flushed at least 25 files + ASSERT_GE(files_added, 25); + // Compaction must have deleted some files + ASSERT_GT(files_deleted, 0); + // No files were moved + ASSERT_EQ(files_moved, 0); + + Close(); + Reopen(options); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + + // Verify that we track all the files again after the DB is closed and opened + Close(); + sst_file_manager.reset(NewSstFileManager(env_)); + options.sst_file_manager = sst_file_manager; + sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get()); + + Reopen(options); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBSSTTest, RateLimitedDelete) { + Destroy(last_options_); + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBSSTTest::RateLimitedDelete:1", + "DeleteScheduler::BackgroundEmptyTrash"}, + }); + + std::vector<uint64_t> penalties; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::BackgroundEmptyTrash:Wait", + [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + // Turn timed wait into a simulated sleep + uint64_t* abs_time_us = static_cast<uint64_t*>(arg); + int64_t cur_time = 0; + env_->GetCurrentTime(&cur_time); + if (*abs_time_us > static_cast<uint64_t>(cur_time)) { + env_->addon_time_.fetch_add(*abs_time_us - + static_cast<uint64_t>(cur_time)); + } + + // Randomly sleep shortly + env_->addon_time_.fetch_add( + static_cast<uint64_t>(Random::GetTLSInstance()->Uniform(10))); + + // Set wait until time to before current to force not to sleep. + int64_t real_cur_time = 0; + Env::Default()->GetCurrentTime(&real_cur_time); + *abs_time_us = static_cast<uint64_t>(real_cur_time); + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + env_->no_slowdown_ = true; + env_->time_elapse_only_sleep_ = true; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + // Need to disable stats dumping and persisting which also use + // RepeatableThread, one of whose member variables is of type + // InstrumentedCondVar. The callback for + // InstrumentedCondVar::TimedWaitInternal can be triggered by stats dumping + // and persisting threads and cause time_spent_deleting measurement to become + // incorrect. + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; + options.env = env_; + + int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec + Status s; + options.sst_file_manager.reset( + NewSstFileManager(env_, nullptr, "", 0, false, &s, 0)); + ASSERT_OK(s); + options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec); + auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get()); + sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1); + + WriteOptions wo; + wo.disableWAL = true; + ASSERT_OK(TryReopen(options)); + // Create 4 files in L0 + for (char v = 'a'; v <= 'd'; v++) { + ASSERT_OK(Put("Key2", DummyString(1024, v), wo)); + ASSERT_OK(Put("Key3", DummyString(1024, v), wo)); + ASSERT_OK(Put("Key4", DummyString(1024, v), wo)); + ASSERT_OK(Put("Key1", DummyString(1024, v), wo)); + ASSERT_OK(Put("Key4", DummyString(1024, v), wo)); + ASSERT_OK(Flush()); + } + // We created 4 sst files in L0 + ASSERT_EQ("4", FilesPerLevel(0)); + + std::vector<LiveFileMetaData> metadata; + db_->GetLiveFilesMetaData(&metadata); + + // Compaction will move the 4 files in L0 to trash and create 1 L1 file + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + uint64_t delete_start_time = env_->NowMicros(); + // Hold BackgroundEmptyTrash + TEST_SYNC_POINT("DBSSTTest::RateLimitedDelete:1"); + sfm->WaitForEmptyTrash(); + uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time; + + uint64_t total_files_size = 0; + uint64_t expected_penlty = 0; + ASSERT_EQ(penalties.size(), metadata.size()); + for (size_t i = 0; i < metadata.size(); i++) { + total_files_size += metadata[i].size; + expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec); + ASSERT_EQ(expected_penlty, penalties[i]); + } + ASSERT_GT(time_spent_deleting, expected_penlty * 0.9); + ASSERT_LT(time_spent_deleting, expected_penlty * 1.1); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBSSTTest, RateLimitedWALDelete) { + Destroy(last_options_); + + std::vector<uint64_t> penalties; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::BackgroundEmptyTrash:Wait", + [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); }); + + env_->no_slowdown_ = true; + env_->time_elapse_only_sleep_ = true; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.env = env_; + + int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec + Status s; + options.sst_file_manager.reset( + NewSstFileManager(env_, nullptr, "", 0, false, &s, 0)); + ASSERT_OK(s); + options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec); + auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get()); + sfm->delete_scheduler()->SetMaxTrashDBRatio(2.1); + + ASSERT_OK(TryReopen(options)); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Create 4 files in L0 + for (char v = 'a'; v <= 'd'; v++) { + ASSERT_OK(Put("Key2", DummyString(1024, v))); + ASSERT_OK(Put("Key3", DummyString(1024, v))); + ASSERT_OK(Put("Key4", DummyString(1024, v))); + ASSERT_OK(Put("Key1", DummyString(1024, v))); + ASSERT_OK(Put("Key4", DummyString(1024, v))); + ASSERT_OK(Flush()); + } + // We created 4 sst files in L0 + ASSERT_EQ("4", FilesPerLevel(0)); + + // Compaction will move the 4 files in L0 to trash and create 1 L1 file + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + sfm->WaitForEmptyTrash(); + ASSERT_EQ(penalties.size(), 8); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBSSTTest, OpenDBWithExistingTrash) { + Options options = CurrentOptions(); + + options.sst_file_manager.reset( + NewSstFileManager(env_, nullptr, "", 1024 * 1024 /* 1 MB/sec */)); + auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get()); + + Destroy(last_options_); + + // Add some trash files to the db directory so the DB can clean them up + env_->CreateDirIfMissing(dbname_); + ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash")); + ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash")); + ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash")); + + // Reopen the DB and verify that it deletes existing trash files + ASSERT_OK(TryReopen(options)); + sfm->WaitForEmptyTrash(); + ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash")); + ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash")); + ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash")); +} + + +// Create a DB with 2 db_paths, and generate multiple files in the 2 +// db_paths using CompactRangeOptions, make sure that files that were +// deleted from first db_path were deleted using DeleteScheduler and +// files in the second path were not. +TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) { + std::atomic<int> bg_delete_file(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteTrashFile:DeleteFile", + [&](void* /*arg*/) { bg_delete_file++; }); + // The deletion scheduler sometimes skips marking file as trash according to + // a heuristic. In that case the deletion will go through the below SyncPoint. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteFile", + [&](void* /*arg*/) { bg_delete_file++; }); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.db_paths.emplace_back(dbname_, 1024 * 100); + options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100); + options.env = env_; + + int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec + Status s; + options.sst_file_manager.reset( + NewSstFileManager(env_, nullptr, "", rate_bytes_per_sec, false, &s, + /* max_trash_db_ratio= */ 1.1)); + + ASSERT_OK(s); + auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get()); + + DestroyAndReopen(options); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions wo; + wo.disableWAL = true; + + // Create 4 files in L0 + for (int i = 0; i < 4; i++) { + ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'), wo)); + ASSERT_OK(Flush()); + } + // We created 4 sst files in L0 + ASSERT_EQ("4", FilesPerLevel(0)); + // Compaction will delete files from L0 in first db path and generate a new + // file in L1 in second db path + CompactRangeOptions compact_options; + compact_options.target_path_id = 1; + Slice begin("Key0"); + Slice end("Key3"); + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + // Create 4 files in L0 + for (int i = 4; i < 8; i++) { + ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'), wo)); + ASSERT_OK(Flush()); + } + ASSERT_EQ("4,1", FilesPerLevel(0)); + + // Compaction will delete files from L0 in first db path and generate a new + // file in L1 in second db path + begin = "Key4"; + end = "Key7"; + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + ASSERT_EQ("0,2", FilesPerLevel(0)); + + sfm->WaitForEmptyTrash(); + ASSERT_EQ(bg_delete_file, 8); + + // Compaction will delete both files and regenerate a file in L1 in second + // db path. The deleted files should still be cleaned up via delete scheduler. + compact_options.bottommost_level_compaction = + BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + sfm->WaitForEmptyTrash(); + ASSERT_EQ(bg_delete_file, 10); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) { + int bg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteTrashFile:DeleteFile", + [&](void* /*arg*/) { bg_delete_file++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Status s; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.env = env_; + options.sst_file_manager.reset( + NewSstFileManager(env_, nullptr, "", 0, false, &s, 0)); + ASSERT_OK(s); + DestroyAndReopen(options); + + // Create 4 files in L0 + for (int i = 0; i < 4; i++) { + ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'))); + ASSERT_OK(Flush()); + } + // We created 4 sst files in L0 + ASSERT_EQ("4", FilesPerLevel(0)); + + // Close DB and destroy it using DeleteScheduler + Close(); + + int num_sst_files = 0; + int num_wal_files = 0; + std::vector<std::string> db_files; + env_->GetChildren(dbname_, &db_files); + for (std::string f : db_files) { + if (f.substr(f.find_last_of(".") + 1) == "sst") { + num_sst_files++; + } else if (f.substr(f.find_last_of(".") + 1) == "log") { + num_wal_files++; + } + } + ASSERT_GT(num_sst_files, 0); + ASSERT_GT(num_wal_files, 0); + + auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get()); + + sfm->SetDeleteRateBytesPerSecond(1024 * 1024); + sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1); + ASSERT_OK(DestroyDB(dbname_, options)); + sfm->WaitForEmptyTrash(); + ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files); +} + +TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) { + std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + Random rnd(301); + + // Generate a file containing 100 keys. + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + uint64_t first_file_size = 0; + auto files_in_db = GetAllSSTFiles(&first_file_size); + ASSERT_EQ(sfm->GetTotalSize(), first_file_size); + + // Set the maximum allowed space usage to the current total size + sfm->SetMaxAllowedSpaceUsage(first_file_size + 1); + + ASSERT_OK(Put("key1", "val1")); + // This flush will cause bg_error_ and will fail + ASSERT_NOK(Flush()); +} + +TEST_F(DBSSTTest, CancellingCompactionsWorks) { + std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.level0_file_num_compaction_trigger = 2; + options.statistics = CreateDBStatistics(); + DestroyAndReopen(options); + + int completed_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* /*arg*/) { + sfm->SetMaxAllowedSpaceUsage(0); + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", + [&](void* /*arg*/) { completed_compactions++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + + // Generate a file containing 10 keys. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + uint64_t total_file_size = 0; + auto files_in_db = GetAllSSTFiles(&total_file_size); + // Set the maximum allowed space usage to the current total size + sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1); + + // Generate another file to trigger compaction. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForCompact(true); + + // Because we set a callback in CancelledCompaction, we actually + // let the compaction run + ASSERT_GT(completed_compactions, 0); + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + // Make sure the stat is bumped + ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(COMPACTION_CANCELLED), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBSSTTest, CancellingManualCompactionsWorks) { + std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.statistics = CreateDBStatistics(); + + FlushedFileCollector* collector = new FlushedFileCollector(); + options.listeners.emplace_back(collector); + + DestroyAndReopen(options); + + Random rnd(301); + + // Generate a file containing 10 keys. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + uint64_t total_file_size = 0; + auto files_in_db = GetAllSSTFiles(&total_file_size); + // Set the maximum allowed space usage to the current total size + sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1); + + // Generate another file to trigger compaction. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + // OK, now trigger a manual compaction + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // Wait for manual compaction to get scheduled and finish + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + // Make sure the stat is bumped + ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount( + COMPACTION_CANCELLED), + 1); + + // Now make sure CompactFiles also gets cancelled + auto l0_files = collector->GetFlushedFiles(); + dbfull()->CompactFiles(rocksdb::CompactionOptions(), l0_files, 0); + + // Wait for manual compaction to get scheduled and finish + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount( + COMPACTION_CANCELLED), + 2); + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + + // Now let the flush through and make sure GetCompactionsReservedSize + // returns to normal + sfm->SetMaxAllowedSpaceUsage(0); + int completed_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactFilesImpl:End", [&](void* /*arg*/) { completed_compactions++; }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + dbfull()->CompactFiles(rocksdb::CompactionOptions(), l0_files, 0); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + ASSERT_GT(completed_compactions, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { + // This test will set a maximum allowed space for the DB, then it will + // keep filling the DB until the limit is reached and bg_error_ is set. + // When bg_error_ is set we will verify that the DB size is greater + // than the limit. + + std::vector<int> max_space_limits_mbs = {1, 10}; + std::atomic<bool> bg_error_set(false); + + std::atomic<int> reached_max_space_on_flush(0); + std::atomic<int> reached_max_space_on_compaction(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", + [&](void* arg) { + Status* bg_error = static_cast<Status*>(arg); + bg_error_set = true; + reached_max_space_on_flush++; + // clear error to ensure compaction callback is called + *bg_error = Status::OK(); + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) { + bool* enough_room = static_cast<bool*>(arg); + *enough_room = true; + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached", + [&](void* /*arg*/) { + bg_error_set = true; + reached_max_space_on_compaction++; + }); + + for (auto limit_mb : max_space_limits_mbs) { + bg_error_set = false; + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.write_buffer_size = 1024 * 512; // 512 Kb + DestroyAndReopen(options); + Random rnd(301); + + sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024); + + // It is easy to detect if the test is stuck in a loop. No need for + // complex termination logic. + while (true) { + auto s = Put(RandomString(&rnd, 10), RandomString(&rnd, 50)); + if (!s.ok()) { + break; + } + } + ASSERT_TRUE(bg_error_set); + uint64_t total_sst_files_size = 0; + GetAllSSTFiles(&total_sst_files_size); + ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } + + ASSERT_GT(reached_max_space_on_flush, 0); + ASSERT_GT(reached_max_space_on_compaction, 0); +} + +TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) { + // Open DB with infinite max open files + // - First iteration use 1 thread to open files + // - Second iteration use 5 threads to open files + for (int iter = 0; iter < 2; iter++) { + Options options; + options.create_if_missing = true; + options.write_buffer_size = 100000; + options.disable_auto_compactions = true; + options.max_open_files = -1; + if (iter == 0) { + options.max_file_opening_threads = 1; + } else { + options.max_file_opening_threads = 5; + } + options = CurrentOptions(options); + DestroyAndReopen(options); + + // Create 12 Files in L0 (then move then to L2) + for (int i = 0; i < 12; i++) { + std::string k = "L2_" + Key(i); + ASSERT_OK(Put(k, k + std::string(1000, 'a'))); + ASSERT_OK(Flush()); + } + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + db_->CompactRange(compact_options, nullptr, nullptr); + + // Create 12 Files in L0 + for (int i = 0; i < 12; i++) { + std::string k = "L0_" + Key(i); + ASSERT_OK(Put(k, k + std::string(1000, 'a'))); + ASSERT_OK(Flush()); + } + Close(); + + // Reopening the DB will load all existing files + Reopen(options); + ASSERT_EQ("12,0,12", FilesPerLevel(0)); + std::vector<std::vector<FileMetaData>> files; + dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files); + + for (const auto& level : files) { + for (const auto& file : level) { + ASSERT_TRUE(file.table_reader_handle != nullptr); + } + } + + for (int i = 0; i < 12; i++) { + ASSERT_EQ(Get("L0_" + Key(i)), "L0_" + Key(i) + std::string(1000, 'a')); + ASSERT_EQ(Get("L2_" + Key(i)), "L2_" + Key(i) + std::string(1000, 'a')); + } + } +} + +TEST_F(DBSSTTest, GetTotalSstFilesSize) { + // We don't propagate oldest-key-time table property on compaction and + // just write 0 as default value. This affect the exact table size, since + // we encode table properties as varint64. Force time to be 0 to work around + // it. Should remove the workaround after we propagate the property on + // compaction. + std::unique_ptr<MockTimeEnv> mock_env(new MockTimeEnv(Env::Default())); + mock_env->set_current_time(0); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.compression = kNoCompression; + options.env = mock_env.get(); + DestroyAndReopen(options); + // Generate 5 files in L0 + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 10; j++) { + std::string val = "val_file_" + ToString(i); + ASSERT_OK(Put(Key(j), val)); + } + Flush(); + } + ASSERT_EQ("5", FilesPerLevel(0)); + + std::vector<LiveFileMetaData> live_files_meta; + dbfull()->GetLiveFilesMetaData(&live_files_meta); + ASSERT_EQ(live_files_meta.size(), 5); + uint64_t single_file_size = live_files_meta[0].size; + + uint64_t live_sst_files_size = 0; + uint64_t total_sst_files_size = 0; + for (const auto& file_meta : live_files_meta) { + live_sst_files_size += file_meta.size; + } + + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 5 + // Total SST files = 5 + ASSERT_EQ(live_sst_files_size, 5 * single_file_size); + ASSERT_EQ(total_sst_files_size, 5 * single_file_size); + + // hold current version + std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions())); + + // Compact 5 files into 1 file in L0 + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + live_files_meta.clear(); + dbfull()->GetLiveFilesMetaData(&live_files_meta); + ASSERT_EQ(live_files_meta.size(), 1); + + live_sst_files_size = 0; + total_sst_files_size = 0; + for (const auto& file_meta : live_files_meta) { + live_sst_files_size += file_meta.size; + } + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 1 (compacted file) + // Total SST files = 6 (5 original files + compacted file) + ASSERT_EQ(live_sst_files_size, 1 * single_file_size); + ASSERT_EQ(total_sst_files_size, 6 * single_file_size); + + // hold current version + std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions())); + + // Delete all keys and compact, this will delete all live files + for (int i = 0; i < 10; i++) { + ASSERT_OK(Delete(Key(i))); + } + Flush(); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("", FilesPerLevel(0)); + + live_files_meta.clear(); + dbfull()->GetLiveFilesMetaData(&live_files_meta); + ASSERT_EQ(live_files_meta.size(), 0); + + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 0 + // Total SST files = 6 (5 original files + compacted file) + ASSERT_EQ(total_sst_files_size, 6 * single_file_size); + + iter1.reset(); + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 0 + // Total SST files = 1 (compacted file) + ASSERT_EQ(total_sst_files_size, 1 * single_file_size); + + iter2.reset(); + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 0 + // Total SST files = 0 + ASSERT_EQ(total_sst_files_size, 0); + + // Close db before mock_env destruct. + Close(); +} + +TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.compression = kNoCompression; + DestroyAndReopen(options); + // Generate 5 files in L0 + for (int i = 0; i < 5; i++) { + ASSERT_OK(Put(Key(i), "val")); + Flush(); + } + ASSERT_EQ("5", FilesPerLevel(0)); + + std::vector<LiveFileMetaData> live_files_meta; + dbfull()->GetLiveFilesMetaData(&live_files_meta); + ASSERT_EQ(live_files_meta.size(), 5); + uint64_t single_file_size = live_files_meta[0].size; + + uint64_t live_sst_files_size = 0; + uint64_t total_sst_files_size = 0; + for (const auto& file_meta : live_files_meta) { + live_sst_files_size += file_meta.size; + } + + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + + // Live SST files = 5 + // Total SST files = 5 + ASSERT_EQ(live_sst_files_size, 5 * single_file_size); + ASSERT_EQ(total_sst_files_size, 5 * single_file_size); + + // hold current version + std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions())); + + // Compaction will do trivial move from L0 to L1 + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,5", FilesPerLevel(0)); + + live_files_meta.clear(); + dbfull()->GetLiveFilesMetaData(&live_files_meta); + ASSERT_EQ(live_files_meta.size(), 5); + + live_sst_files_size = 0; + total_sst_files_size = 0; + for (const auto& file_meta : live_files_meta) { + live_sst_files_size += file_meta.size; + } + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 5 + // Total SST files = 5 (used in 2 version) + ASSERT_EQ(live_sst_files_size, 5 * single_file_size); + ASSERT_EQ(total_sst_files_size, 5 * single_file_size); + + // hold current version + std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions())); + + // Delete all keys and compact, this will delete all live files + for (int i = 0; i < 5; i++) { + ASSERT_OK(Delete(Key(i))); + } + Flush(); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("", FilesPerLevel(0)); + + live_files_meta.clear(); + dbfull()->GetLiveFilesMetaData(&live_files_meta); + ASSERT_EQ(live_files_meta.size(), 0); + + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 0 + // Total SST files = 5 (used in 2 version) + ASSERT_EQ(total_sst_files_size, 5 * single_file_size); + + iter1.reset(); + iter2.reset(); + + ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", + &total_sst_files_size)); + // Live SST files = 0 + // Total SST files = 0 + ASSERT_EQ(total_sst_files_size, 0); +} + +#endif // ROCKSDB_LITE + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |