diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/obsolete_files_test.cc | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/src/rocksdb/db/obsolete_files_test.cc b/src/rocksdb/db/obsolete_files_test.cc new file mode 100644 index 000000000..8e9f28f65 --- /dev/null +++ b/src/rocksdb/db/obsolete_files_test.cc @@ -0,0 +1,328 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include <stdlib.h> + +#include <algorithm> +#include <map> +#include <string> +#include <vector> + +#include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "file/filename.h" +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/transaction_log.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +class ObsoleteFilesTest : public DBTestBase { + public: + ObsoleteFilesTest() + : DBTestBase("obsolete_files_test", /*env_do_fsync=*/true), + wal_dir_(dbname_ + "/wal_files") {} + + void AddKeys(int numkeys, int startkey) { + WriteOptions options; + options.sync = false; + for (int i = startkey; i < (numkeys + startkey); i++) { + std::string temp = std::to_string(i); + Slice key(temp); + Slice value(temp); + ASSERT_OK(db_->Put(options, key, value)); + } + } + + void createLevel0Files(int numFiles, int numKeysPerFile) { + int startKey = 0; + for (int i = 0; i < numFiles; i++) { + AddKeys(numKeysPerFile, startKey); + startKey += numKeysPerFile; + ASSERT_OK(dbfull()->TEST_FlushMemTable()); + ASSERT_OK( + dbfull()->TEST_WaitForCompact()); // wait for background flush (flush + // is also a kind of compaction). + } + } + + void CheckFileTypeCounts(const std::string& dir, int required_log, + int required_sst, int required_manifest) { + std::vector<std::string> filenames; + ASSERT_OK(env_->GetChildren(dir, &filenames)); + + int log_cnt = 0; + int sst_cnt = 0; + int manifest_cnt = 0; + for (auto file : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(file, &number, &type)) { + log_cnt += (type == kWalFile); + sst_cnt += (type == kTableFile); + manifest_cnt += (type == kDescriptorFile); + } + } + ASSERT_EQ(required_log, log_cnt); + ASSERT_EQ(required_sst, sst_cnt); + ASSERT_EQ(required_manifest, manifest_cnt); + } + + void ReopenDB() { + Options options = CurrentOptions(); + // Trigger compaction when the number of level 0 files reaches 2. + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.disable_auto_compactions = false; + options.delete_obsolete_files_period_micros = 0; // always do full purge + options.enable_thread_tracking = true; + options.write_buffer_size = 1024 * 1024 * 1000; + options.target_file_size_base = 1024 * 1024 * 1000; + options.max_bytes_for_level_base = 1024 * 1024 * 1000; + options.WAL_ttl_seconds = 300; // Used to test log files + options.WAL_size_limit_MB = 1024; // Used to test log files + options.wal_dir = wal_dir_; + + // Note: the following prevents an otherwise harmless data race between the + // test setup code (AddBlobFile) in ObsoleteFilesTest.BlobFiles and the + // periodic stat dumping thread. + options.stats_dump_period_sec = 0; + + Destroy(options); + Reopen(options); + } + + const std::string wal_dir_; +}; + +TEST_F(ObsoleteFilesTest, RaceForObsoleteFileDeletion) { + ReopenDB(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::BackgroundCallCompaction:FoundObsoleteFiles", + "ObsoleteFilesTest::RaceForObsoleteFileDeletion:1"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "ObsoleteFilesTest::RaceForObsoleteFileDeletion:2"}, + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DeleteObsoleteFileImpl:AfterDeletion", [&](void* arg) { + Status* p_status = reinterpret_cast<Status*>(arg); + ASSERT_OK(*p_status); + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::CloseHelper:PendingPurgeFinished", [&](void* arg) { + std::unordered_set<uint64_t>* files_grabbed_for_purge_ptr = + reinterpret_cast<std::unordered_set<uint64_t>*>(arg); + ASSERT_TRUE(files_grabbed_for_purge_ptr->empty()); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + createLevel0Files(2, 50000); + CheckFileTypeCounts(wal_dir_, 1, 0, 0); + + port::Thread user_thread([this]() { + JobContext jobCxt(0); + TEST_SYNC_POINT("ObsoleteFilesTest::RaceForObsoleteFileDeletion:1"); + dbfull()->TEST_LockMutex(); + dbfull()->FindObsoleteFiles(&jobCxt, true /* force=true */, + false /* no_full_scan=false */); + dbfull()->TEST_UnlockMutex(); + TEST_SYNC_POINT("ObsoleteFilesTest::RaceForObsoleteFileDeletion:2"); + dbfull()->PurgeObsoleteFiles(jobCxt); + jobCxt.Clean(); + }); + + user_thread.join(); +} + +TEST_F(ObsoleteFilesTest, DeleteObsoleteOptionsFile) { + ReopenDB(); + + createLevel0Files(2, 50000); + CheckFileTypeCounts(wal_dir_, 1, 0, 0); + + ASSERT_OK(dbfull()->DisableFileDeletions()); + for (int i = 0; i != 4; ++i) { + if (i % 2) { + ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(), + {{"paranoid_file_checks", "false"}})); + } else { + ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(), + {{"paranoid_file_checks", "true"}})); + } + } + ASSERT_OK(dbfull()->EnableFileDeletions(true /* force */)); + + Close(); + + std::vector<std::string> files; + int opts_file_count = 0; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& file : files) { + uint64_t file_num; + Slice dummy_info_log_name_prefix; + FileType type; + WalFileType log_type; + if (ParseFileName(file, &file_num, dummy_info_log_name_prefix, &type, + &log_type) && + type == kOptionsFile) { + opts_file_count++; + } + } + ASSERT_EQ(2, opts_file_count); +} + +TEST_F(ObsoleteFilesTest, BlobFiles) { + ReopenDB(); + + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + const ImmutableCFOptions* const ioptions = cfd->ioptions(); + assert(ioptions); + assert(!ioptions->cf_paths.empty()); + + const std::string& path = ioptions->cf_paths.front().path; + + // Add an obsolete blob file. + constexpr uint64_t first_blob_file_number = 234; + versions->AddObsoleteBlobFile(first_blob_file_number, path); + + // Add a live blob file. + Version* const version = cfd->current(); + assert(version); + + VersionStorageInfo* const storage_info = version->storage_info(); + assert(storage_info); + + constexpr uint64_t second_blob_file_number = 456; + constexpr uint64_t second_total_blob_count = 100; + constexpr uint64_t second_total_blob_bytes = 2000000; + constexpr char second_checksum_method[] = "CRC32B"; + constexpr char second_checksum_value[] = "\x6d\xbd\xf2\x3a"; + + auto shared_meta = SharedBlobFileMetaData::Create( + second_blob_file_number, second_total_blob_count, second_total_blob_bytes, + second_checksum_method, second_checksum_value); + + constexpr uint64_t second_garbage_blob_count = 0; + constexpr uint64_t second_garbage_blob_bytes = 0; + + auto meta = BlobFileMetaData::Create( + std::move(shared_meta), BlobFileMetaData::LinkedSsts(), + second_garbage_blob_count, second_garbage_blob_bytes); + + storage_info->AddBlobFile(std::move(meta)); + + // Check for obsolete files and make sure the first blob file is picked up + // and grabbed for purge. The second blob file should be on the live list. + constexpr int job_id = 0; + JobContext job_context{job_id}; + + dbfull()->TEST_LockMutex(); + constexpr bool force_full_scan = false; + dbfull()->FindObsoleteFiles(&job_context, force_full_scan); + dbfull()->TEST_UnlockMutex(); + + ASSERT_TRUE(job_context.HaveSomethingToDelete()); + ASSERT_EQ(job_context.blob_delete_files.size(), 1); + ASSERT_EQ(job_context.blob_delete_files[0].GetBlobFileNumber(), + first_blob_file_number); + + const auto& files_grabbed_for_purge = + dbfull()->TEST_GetFilesGrabbedForPurge(); + ASSERT_NE(files_grabbed_for_purge.find(first_blob_file_number), + files_grabbed_for_purge.end()); + + ASSERT_EQ(job_context.blob_live.size(), 1); + ASSERT_EQ(job_context.blob_live[0], second_blob_file_number); + + // Hack the job context a bit by adding a few files to the full scan + // list and adjusting the pending file number. We add the two files + // above as well as two additional ones, where one is old + // and should be cleaned up, and the other is still pending. + constexpr uint64_t old_blob_file_number = 123; + constexpr uint64_t pending_blob_file_number = 567; + + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(old_blob_file_number), path); + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(first_blob_file_number), path); + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(second_blob_file_number), path); + job_context.full_scan_candidate_files.emplace_back( + BlobFileName(pending_blob_file_number), path); + + job_context.min_pending_output = pending_blob_file_number; + + // Purge obsolete files and make sure we purge the old file and the first file + // (and keep the second file and the pending file). + std::vector<std::string> deleted_files; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion", [&](void* arg) { + const std::string* file = static_cast<std::string*>(arg); + assert(file); + + constexpr char blob_extension[] = ".blob"; + + if (file->find(blob_extension) != std::string::npos) { + deleted_files.emplace_back(*file); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + dbfull()->PurgeObsoleteFiles(job_context); + job_context.Clean(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(files_grabbed_for_purge.find(first_blob_file_number), + files_grabbed_for_purge.end()); + + std::sort(deleted_files.begin(), deleted_files.end()); + const std::vector<std::string> expected_deleted_files{ + BlobFileName(path, old_blob_file_number), + BlobFileName(path, first_blob_file_number)}; + + ASSERT_EQ(deleted_files, expected_deleted_files); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "SKIPPED as DBImpl::DeleteFile is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE |