diff options
Diffstat (limited to 'src/rocksdb/db/db_wal_test.cc')
-rw-r--r-- | src/rocksdb/db/db_wal_test.cc | 2314 |
1 files changed, 2314 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_wal_test.cc b/src/rocksdb/db/db_wal_test.cc new file mode 100644 index 000000000..5b5ec76af --- /dev/null +++ b/src/rocksdb/db/db_wal_test.cc @@ -0,0 +1,2314 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_test_util.h" +#include "options/options_helper.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "rocksdb/file_system.h" +#include "test_util/sync_point.h" +#include "utilities/fault_injection_env.h" +#include "utilities/fault_injection_fs.h" + +namespace ROCKSDB_NAMESPACE { +class DBWALTestBase : public DBTestBase { + protected: + explicit DBWALTestBase(const std::string& dir_name) + : DBTestBase(dir_name, /*env_do_fsync=*/true) {} + +#if defined(ROCKSDB_PLATFORM_POSIX) + public: +#if defined(ROCKSDB_FALLOCATE_PRESENT) + bool IsFallocateSupported() { + // Test fallocate support of running file system. + // Skip this test if fallocate is not supported. + std::string fname_test_fallocate = dbname_ + "/preallocate_testfile"; + int fd = -1; + do { + fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + } while (fd < 0 && errno == EINTR); + assert(fd > 0); + int alloc_status = fallocate(fd, 0, 0, 1); + int err_number = errno; + close(fd); + assert(env_->DeleteFile(fname_test_fallocate) == Status::OK()); + if (err_number == ENOSYS || err_number == EOPNOTSUPP) { + fprintf(stderr, "Skipped preallocated space check: %s\n", + errnoStr(err_number).c_str()); + return false; + } + assert(alloc_status == 0); + return true; + } +#endif // ROCKSDB_FALLOCATE_PRESENT + + uint64_t GetAllocatedFileSize(std::string file_name) { + struct stat sbuf; + int err = stat(file_name.c_str(), &sbuf); + assert(err == 0); + return sbuf.st_blocks * 512; + } +#endif // ROCKSDB_PLATFORM_POSIX +}; + +class DBWALTest : public DBWALTestBase { + public: + DBWALTest() : DBWALTestBase("/db_wal_test") {} +}; + +// A SpecialEnv enriched to give more insight about deleted files +class EnrichedSpecialEnv : public SpecialEnv { + public: + explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {} + Status NewSequentialFile(const std::string& f, + std::unique_ptr<SequentialFile>* r, + const EnvOptions& soptions) override { + InstrumentedMutexLock l(&env_mutex_); + if (f == skipped_wal) { + deleted_wal_reopened = true; + if (IsWAL(f) && largest_deleted_wal.size() != 0 && + f.compare(largest_deleted_wal) <= 0) { + gap_in_wals = true; + } + } + return SpecialEnv::NewSequentialFile(f, r, soptions); + } + Status DeleteFile(const std::string& fname) override { + if (IsWAL(fname)) { + deleted_wal_cnt++; + InstrumentedMutexLock l(&env_mutex_); + // If this is the first WAL, remember its name and skip deleting it. We + // remember its name partly because the application might attempt to + // delete the file again. + if (skipped_wal.size() != 0 && skipped_wal != fname) { + if (largest_deleted_wal.size() == 0 || + largest_deleted_wal.compare(fname) < 0) { + largest_deleted_wal = fname; + } + } else { + skipped_wal = fname; + return Status::OK(); + } + } + return SpecialEnv::DeleteFile(fname); + } + bool IsWAL(const std::string& fname) { + // printf("iswal %s\n", fname.c_str()); + return fname.compare(fname.size() - 3, 3, "log") == 0; + } + + InstrumentedMutex env_mutex_; + // the wal whose actual delete was skipped by the env + std::string skipped_wal = ""; + // the largest WAL that was requested to be deleted + std::string largest_deleted_wal = ""; + // number of WALs that were successfully deleted + std::atomic<size_t> deleted_wal_cnt = {0}; + // the WAL whose delete from fs was skipped is reopened during recovery + std::atomic<bool> deleted_wal_reopened = {false}; + // whether a gap in the WALs was detected during recovery + std::atomic<bool> gap_in_wals = {false}; +}; + +class DBWALTestWithEnrichedEnv : public DBTestBase { + public: + DBWALTestWithEnrichedEnv() + : DBTestBase("db_wal_test", /*env_do_fsync=*/true) { + enriched_env_ = new EnrichedSpecialEnv(env_->target()); + auto options = CurrentOptions(); + options.env = enriched_env_; + options.allow_2pc = true; + Reopen(options); + delete env_; + // to be deleted by the parent class + env_ = enriched_env_; + } + + protected: + EnrichedSpecialEnv* enriched_env_; +}; + +// Test that the recovery would successfully avoid the gaps between the logs. +// One known scenario that could cause this is that the application issue the +// WAL deletion out of order. For the sake of simplicity in the test, here we +// create the gap by manipulating the env to skip deletion of the first WAL but +// not the ones after it. +TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) { + auto options = last_options_; + // To cause frequent WAL deletion + options.write_buffer_size = 128; + Reopen(options); + + WriteOptions writeOpt = WriteOptions(); + for (int i = 0; i < 128 * 5; i++) { + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); + } + FlushOptions fo; + fo.wait = true; + ASSERT_OK(db_->Flush(fo)); + + // some wals are deleted + ASSERT_NE(0, enriched_env_->deleted_wal_cnt); + // but not the first one + ASSERT_NE(0, enriched_env_->skipped_wal.size()); + + // Test that the WAL that was not deleted will be skipped during recovery + options = last_options_; + Reopen(options); + ASSERT_FALSE(enriched_env_->deleted_wal_reopened); + ASSERT_FALSE(enriched_env_->gap_in_wals); +} + +TEST_F(DBWALTest, WAL) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); + + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v1", Get(1, "bar")); + + writeOpt.disableWAL = false; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2")); + + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + // Both value's should be present. + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v2", Get(1, "foo")); + + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3")); + writeOpt.disableWAL = false; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3")); + + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + // again both values should be present. + ASSERT_EQ("v3", Get(1, "foo")); + ASSERT_EQ("v3", Get(1, "bar")); + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, RollLog) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(Put(1, "baz", "v5")); + + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + for (int i = 0; i < 10; i++) { + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + } + ASSERT_OK(Put(1, "foo", "v4")); + for (int i = 0; i < 10; i++) { + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + } + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, SyncWALNotBlockWrite) { + Options options = CurrentOptions(); + options.max_write_buffer_number = 4; + DestroyAndReopen(options); + + ASSERT_OK(Put("foo1", "bar1")); + ASSERT_OK(Put("foo5", "bar5")); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"WritableFileWriter::SyncWithoutFlush:1", + "DBWALTest::SyncWALNotBlockWrite:1"}, + {"DBWALTest::SyncWALNotBlockWrite:2", + "WritableFileWriter::SyncWithoutFlush:2"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); }); + + TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1"); + ASSERT_OK(Put("foo2", "bar2")); + ASSERT_OK(Put("foo3", "bar3")); + FlushOptions fo; + fo.wait = false; + ASSERT_OK(db_->Flush(fo)); + ASSERT_OK(Put("foo4", "bar4")); + + TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2"); + + thread.join(); + + ASSERT_EQ(Get("foo1"), "bar1"); + ASSERT_EQ(Get("foo2"), "bar2"); + ASSERT_EQ(Get("foo3"), "bar3"); + ASSERT_EQ(Get("foo4"), "bar4"); + ASSERT_EQ(Get("foo5"), "bar5"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBWALTest, SyncWALNotWaitWrite) { + ASSERT_OK(Put("foo1", "bar1")); + ASSERT_OK(Put("foo3", "bar3")); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"}, + {"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread thread( + [&]() { ASSERT_OK(Put("foo2", "bar2")); }); + // Moving this to SyncWAL before the actual fsync + // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); + ASSERT_OK(db_->SyncWAL()); + // Moving this to SyncWAL after actual fsync + // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); + + thread.join(); + + ASSERT_EQ(Get("foo1"), "bar1"); + ASSERT_EQ(Get("foo2"), "bar2"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBWALTest, Recover) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(Put(1, "baz", "v5")); + + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v5", Get(1, "baz")); + ASSERT_OK(Put(1, "bar", "v2")); + ASSERT_OK(Put(1, "foo", "v3")); + + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + ASSERT_EQ("v3", Get(1, "foo")); + ASSERT_OK(Put(1, "foo", "v4")); + ASSERT_EQ("v4", Get(1, "foo")); + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v5", Get(1, "baz")); + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, RecoverWithTableHandle) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.avoid_flush_during_recovery = false; + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(Put(1, "bar", "v2")); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(1, "foo", "v3")); + ASSERT_OK(Put(1, "bar", "v4")); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(1, "big", std::string(100, 'a'))); + + options = CurrentOptions(); + const int kSmallMaxOpenFiles = 13; + if (option_config_ == kDBLogDir) { + // Use this option to check not preloading files + // Set the max open files to be small enough so no preload will + // happen. + options.max_open_files = kSmallMaxOpenFiles; + // RocksDB sanitize max open files to at least 20. Modify it back. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = static_cast<int*>(arg); + *max_open_files = kSmallMaxOpenFiles; + }); + + } else if (option_config_ == kWalDirAndMmapReads) { + // Use this option to check always loading all files. + options.max_open_files = 100; + } else { + options.max_open_files = -1; + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + std::vector<std::vector<FileMetaData>> files; + dbfull()->TEST_GetFilesMetaData(handles_[1], &files); + size_t total_files = 0; + for (const auto& level : files) { + total_files += level.size(); + } + ASSERT_EQ(total_files, 3); + for (const auto& level : files) { + for (const auto& file : level) { + if (options.max_open_files == kSmallMaxOpenFiles) { + ASSERT_TRUE(file.table_reader_handle == nullptr); + } else { + ASSERT_TRUE(file.table_reader_handle != nullptr); + } + } + } + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, RecoverWithBlob) { + // Write a value that's below the prospective size limit for blobs and another + // one that's above. Note that blob files are not actually enabled at this + // point. + constexpr uint64_t min_blob_size = 10; + + constexpr char short_value[] = "short"; + static_assert(sizeof(short_value) - 1 < min_blob_size, + "short_value too long"); + + constexpr char long_value[] = "long_value"; + static_assert(sizeof(long_value) - 1 >= min_blob_size, + "long_value too short"); + + ASSERT_OK(Put("key1", short_value)); + ASSERT_OK(Put("key2", long_value)); + + // There should be no files just yet since we haven't flushed. + { + VersionSet* const versions = dbfull()->GetVersionSet(); + ASSERT_NE(versions, nullptr); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + + ASSERT_EQ(storage_info->num_non_empty_levels(), 0); + ASSERT_TRUE(storage_info->GetBlobFiles().empty()); + } + + // Reopen the database with blob files enabled. A new table file/blob file + // pair should be written during recovery. + Options options; + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + options.avoid_flush_during_recovery = false; + options.disable_auto_compactions = true; + options.env = env_; + + Reopen(options); + + ASSERT_EQ(Get("key1"), short_value); + ASSERT_EQ(Get("key2"), long_value); + + VersionSet* const versions = dbfull()->GetVersionSet(); + ASSERT_NE(versions, nullptr); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_EQ(l0_files.size(), 1); + + const FileMetaData* const table_file = l0_files[0]; + ASSERT_NE(table_file, nullptr); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + const auto& blob_file = blob_files.front(); + ASSERT_NE(blob_file, nullptr); + + ASSERT_EQ(table_file->smallest.user_key(), "key1"); + ASSERT_EQ(table_file->largest.user_key(), "key2"); + ASSERT_EQ(table_file->fd.smallest_seqno, 1); + ASSERT_EQ(table_file->fd.largest_seqno, 2); + ASSERT_EQ(table_file->oldest_blob_file_number, + blob_file->GetBlobFileNumber()); + + ASSERT_EQ(blob_file->GetTotalBlobCount(), 1); + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + ASSERT_NE(internal_stats, nullptr); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize()); + ASSERT_EQ(compaction_stats[0].bytes_written_blob, + blob_file->GetTotalBlobBytes()); + ASSERT_EQ(compaction_stats[0].num_output_files, 1); + ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1); + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], + compaction_stats[0].bytes_written + + compaction_stats[0].bytes_written_blob); +#endif // ROCKSDB_LITE +} + +TEST_F(DBWALTest, RecoverWithBlobMultiSST) { + // Write several large (4 KB) values without flushing. Note that blob files + // are not actually enabled at this point. + std::string large_value(1 << 12, 'a'); + + constexpr int num_keys = 64; + + for (int i = 0; i < num_keys; ++i) { + ASSERT_OK(Put(Key(i), large_value)); + } + + // There should be no files just yet since we haven't flushed. + { + VersionSet* const versions = dbfull()->GetVersionSet(); + ASSERT_NE(versions, nullptr); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + + ASSERT_EQ(storage_info->num_non_empty_levels(), 0); + ASSERT_TRUE(storage_info->GetBlobFiles().empty()); + } + + // Reopen the database with blob files enabled and write buffer size set to a + // smaller value. Multiple table files+blob files should be written and added + // to the Version during recovery. + Options options; + options.write_buffer_size = 1 << 16; // 64 KB + options.enable_blob_files = true; + options.avoid_flush_during_recovery = false; + options.disable_auto_compactions = true; + options.env = env_; + + Reopen(options); + + for (int i = 0; i < num_keys; ++i) { + ASSERT_EQ(Get(Key(i)), large_value); + } + + VersionSet* const versions = dbfull()->GetVersionSet(); + ASSERT_NE(versions, nullptr); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_GT(l0_files.size(), 1); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_GT(blob_files.size(), 1); + + ASSERT_EQ(l0_files.size(), blob_files.size()); +} + +TEST_F(DBWALTest, WALWithChecksumHandoff) { +#ifndef ROCKSDB_ASSERT_STATUS_CHECKED + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr<FaultInjectionTestFS> fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs)); + do { + Options options = CurrentOptions(); + + options.checksum_handoff_file_types.Add(FileType::kWalFile); + options.env = fault_fs_env.get(); + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + CreateAndReopenWithCF({"pikachu"}, options); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v1", Get(1, "bar")); + + writeOpt.disableWAL = false; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Both value's should be present. + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v2", Get(1, "foo")); + + writeOpt.disableWAL = true; + // This put, data is persisted by Flush + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + writeOpt.disableWAL = false; + // Data is persisted in the WAL + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "zoo", "v3")); + // The hash does not match, write fails + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + writeOpt.disableWAL = false; + ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Due to the write failure, Get should not find + ASSERT_NE("v3", Get(1, "foo")); + ASSERT_EQ("v3", Get(1, "zoo")); + ASSERT_EQ("v3", Get(1, "bar")); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + // Each write will be similated as corrupted. + fault_fs->IngestDataCorruptionBeforeWrite(); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v4")); + writeOpt.disableWAL = false; + ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v4")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_NE("v4", Get(1, "foo")); + ASSERT_NE("v4", Get(1, "bar")); + fault_fs->NoDataCorruptionBeforeWrite(); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + // The file system does not provide checksum method and verification. + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v5")); + writeOpt.disableWAL = false; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v5")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v5", Get(1, "foo")); + ASSERT_EQ("v5", Get(1, "bar")); + + Destroy(options); + } while (ChangeWalOptions()); +#endif // ROCKSDB_ASSERT_STATUS_CHECKED +} + +class DBRecoveryTestBlobError + : public DBWALTest, + public testing::WithParamInterface<std::string> { + public: + DBRecoveryTestBlobError() : sync_point_(GetParam()) {} + + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError, + ::testing::ValuesIn(std::vector<std::string>{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) { + // Write a value. Note that blob files are not actually enabled at this point. + ASSERT_OK(Put("key", "blob")); + + // Reopen with blob files enabled but make blob file writing fail during + // recovery. + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast<Status*>(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options; + options.enable_blob_files = true; + options.avoid_flush_during_recovery = false; + options.disable_auto_compactions = true; + options.env = env_; + + ASSERT_NOK(TryReopen(options)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Make sure the files generated by the failed recovery have been deleted. + std::vector<std::string> files; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& file : files) { + uint64_t number = 0; + FileType type = kTableFile; + + if (!ParseFileName(file, &number, &type)) { + continue; + } + + ASSERT_NE(type, kTableFile); + ASSERT_NE(type, kBlobFile); + } +} + +TEST_F(DBWALTest, IgnoreRecoveredLog) { + std::string backup_logs = dbname_ + "/backup_logs"; + + do { + // delete old files in backup_logs directory + ASSERT_OK(env_->CreateDirIfMissing(backup_logs)); + std::vector<std::string> old_files; + ASSERT_OK(env_->GetChildren(backup_logs, &old_files)); + for (auto& file : old_files) { + ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file)); + } + Options options = CurrentOptions(); + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + options.wal_dir = dbname_ + "/logs"; + DestroyAndReopen(options); + + // fill up the DB + std::string one, two; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); + ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one))); + + // copy the logs to backup + std::vector<std::string> logs; + ASSERT_OK(env_->GetChildren(options.wal_dir, &logs)); + for (auto& log : logs) { + CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log); + } + + // recover the DB + Reopen(options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + + // copy the logs from backup back to wal dir + for (auto& log : logs) { + CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); + } + // this should ignore the log files, recovery should not happen again + // if the recovery happens, the same merge operator would be called twice, + // leading to incorrect results + Reopen(options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + Close(); + Destroy(options); + Reopen(options); + Close(); + + // copy the logs from backup back to wal dir + ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir)); + for (auto& log : logs) { + CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); + } + // assert that we successfully recovered only from logs, even though we + // destroyed the DB + Reopen(options); + ASSERT_EQ(two, Get("foo")); + ASSERT_EQ(one, Get("bar")); + + // Recovery will fail if DB directory doesn't exist. + Destroy(options); + // copy the logs from backup back to wal dir + ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir)); + for (auto& log : logs) { + CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); + // we won't be needing this file no more + ASSERT_OK(env_->DeleteFile(backup_logs + "/" + log)); + } + Status s = TryReopen(options); + ASSERT_NOK(s); + Destroy(options); + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, RecoveryWithEmptyLog) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(Put(1, "foo", "v2")); + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + ASSERT_OK(Put(1, "foo", "v3")); + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + ASSERT_EQ("v3", Get(1, "foo")); + } while (ChangeWalOptions()); +} + +#if !(defined NDEBUG) || !defined(OS_WIN) +TEST_F(DBWALTest, PreallocateBlock) { + Options options = CurrentOptions(); + options.write_buffer_size = 10 * 1000 * 1000; + options.max_total_wal_size = 0; + + size_t expected_preallocation_size = static_cast<size_t>( + options.write_buffer_size + options.write_buffer_size / 10); + + DestroyAndReopen(options); + + std::atomic<int> called(0); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { + ASSERT_TRUE(arg != nullptr); + size_t preallocation_size = *(static_cast<size_t*>(arg)); + ASSERT_EQ(expected_preallocation_size, preallocation_size); + called.fetch_add(1); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put("", "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("", "")); + Close(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(2, called.load()); + + options.max_total_wal_size = 1000 * 1000; + expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size); + Reopen(options); + called.store(0); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { + ASSERT_TRUE(arg != nullptr); + size_t preallocation_size = *(static_cast<size_t*>(arg)); + ASSERT_EQ(expected_preallocation_size, preallocation_size); + called.fetch_add(1); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put("", "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("", "")); + Close(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(2, called.load()); + + options.db_write_buffer_size = 800 * 1000; + expected_preallocation_size = + static_cast<size_t>(options.db_write_buffer_size); + Reopen(options); + called.store(0); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { + ASSERT_TRUE(arg != nullptr); + size_t preallocation_size = *(static_cast<size_t*>(arg)); + ASSERT_EQ(expected_preallocation_size, preallocation_size); + called.fetch_add(1); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put("", "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("", "")); + Close(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(2, called.load()); + + expected_preallocation_size = 700 * 1000; + std::shared_ptr<WriteBufferManager> write_buffer_manager = + std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000)); + options.write_buffer_manager = write_buffer_manager; + Reopen(options); + called.store(0); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { + ASSERT_TRUE(arg != nullptr); + size_t preallocation_size = *(static_cast<size_t*>(arg)); + ASSERT_EQ(expected_preallocation_size, preallocation_size); + called.fetch_add(1); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put("", "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("", "")); + Close(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(2, called.load()); +} +#endif // !(defined NDEBUG) || !defined(OS_WIN) + +#ifndef ROCKSDB_LITE +TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) { + // TODO(ajkr): Disabled until WAL recycling is fixed for + // `kPointInTimeRecovery`. + + // For github issue #1303 + for (int i = 0; i < 2; ++i) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.recycle_log_file_num = 2; + if (i != 0) { + options.wal_dir = alternative_wal_dir_; + } + + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "v1")); + VectorLogPtr log_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); + ASSERT_GT(log_files.size(), 0); + ASSERT_OK(Flush()); + + // Now the original WAL is in log_files[0] and should be marked for + // recycling. + // Verify full purge cannot remove this file. + JobContext job_context(0); + dbfull()->TEST_LockMutex(); + dbfull()->FindObsoleteFiles(&job_context, true /* force */); + dbfull()->TEST_UnlockMutex(); + dbfull()->PurgeObsoleteFiles(job_context); + + if (i == 0) { + ASSERT_OK( + env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber()))); + } else { + ASSERT_OK(env_->FileExists( + LogFileName(alternative_wal_dir_, log_files[0]->LogNumber()))); + } + } +} + +TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) { + // TODO(ajkr): Disabled until WAL recycling is fixed for + // `kPointInTimeRecovery`. + + // Ensures full purge cannot delete a WAL while it's in the process of being + // recycled. In particular, we force the full purge after a file has been + // chosen for reuse, but before it has been renamed. + for (int i = 0; i < 2; ++i) { + Options options = CurrentOptions(); + options.recycle_log_file_num = 1; + if (i != 0) { + options.wal_dir = alternative_wal_dir_; + } + DestroyAndReopen(options); + + // The first flush creates a second log so writes can continue before the + // flush finishes. + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + // The second flush can recycle the first log. Sync points enforce the + // full purge happens after choosing the log to recycle and before it is + // renamed. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::CreateWAL:BeforeReuseWritableFile1", + "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"}, + {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge", + "DBImpl::CreateWAL:BeforeReuseWritableFile2"}, + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::port::Thread thread([&]() { + TEST_SYNC_POINT( + "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"); + ASSERT_OK(db_->EnableFileDeletions(true)); + TEST_SYNC_POINT( + "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge"); + }); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + thread.join(); + } +} + +TEST_F(DBWALTest, GetSortedWalFiles) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + VectorLogPtr log_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); + ASSERT_EQ(0, log_files.size()); + + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); + ASSERT_EQ(1, log_files.size()); + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, GetCurrentWalFile) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + + std::unique_ptr<LogFile>* bad_log_file = nullptr; + ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file)); + + std::unique_ptr<LogFile> log_file; + ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); + + // nothing has been written to the log yet + ASSERT_EQ(log_file->StartSequence(), 0); + ASSERT_EQ(log_file->SizeFileBytes(), 0); + ASSERT_EQ(log_file->Type(), kAliveLogFile); + ASSERT_GT(log_file->LogNumber(), 0); + + // add some data and verify that the file size actually moves foward + ASSERT_OK(Put(0, "foo", "v1")); + ASSERT_OK(Put(0, "foo2", "v2")); + ASSERT_OK(Put(0, "foo3", "v3")); + + ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); + + ASSERT_EQ(log_file->StartSequence(), 0); + ASSERT_GT(log_file->SizeFileBytes(), 0); + ASSERT_EQ(log_file->Type(), kAliveLogFile); + ASSERT_GT(log_file->LogNumber(), 0); + + // force log files to cycle and add some more data, then check if + // log number moves forward + + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + for (int i = 0; i < 10; i++) { + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + } + + ASSERT_OK(Put(0, "foo4", "v4")); + ASSERT_OK(Put(0, "foo5", "v5")); + ASSERT_OK(Put(0, "foo6", "v6")); + + ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); + + ASSERT_EQ(log_file->StartSequence(), 0); + ASSERT_GT(log_file->SizeFileBytes(), 0); + ASSERT_EQ(log_file->Type(), kAliveLogFile); + ASSERT_GT(log_file->LogNumber(), 0); + + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) { + // Test for regression of WAL cleanup missing files that don't contain data + // for every column family. + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ASSERT_OK(Put(1, "foo", "v1")); + ASSERT_OK(Put(1, "foo", "v2")); + uint64_t earliest_log_nums[2]; + for (int i = 0; i < 2; ++i) { + if (i > 0) { + ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + } + VectorLogPtr log_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); + if (log_files.size() > 0) { + earliest_log_nums[i] = log_files[0]->LogNumber(); + } else { + earliest_log_nums[i] = std::numeric_limits<uint64_t>::max(); + } + } + // Check at least the first WAL was cleaned up during the recovery. + ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]); + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTest, RecoverWithLargeLog) { + do { + { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(Put(1, "big1", std::string(200000, '1'))); + ASSERT_OK(Put(1, "big2", std::string(200000, '2'))); + ASSERT_OK(Put(1, "small3", std::string(10, '3'))); + ASSERT_OK(Put(1, "small4", std::string(10, '4'))); + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + } + + // Make sure that if we re-open with a small write buffer size that + // we flush table files in the middle of a large log file. + Options options; + options.write_buffer_size = 100000; + options = CurrentOptions(options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3); + ASSERT_EQ(std::string(200000, '1'), Get(1, "big1")); + ASSERT_EQ(std::string(200000, '2'), Get(1, "big2")); + ASSERT_EQ(std::string(10, '3'), Get(1, "small3")); + ASSERT_EQ(std::string(10, '4'), Get(1, "small4")); + ASSERT_GT(NumTableFilesAtLevel(0, 1), 1); + } while (ChangeWalOptions()); +} + +// In https://reviews.facebook.net/D20661 we change +// recovery behavior: previously for each log file each column family +// memtable was flushed, even it was empty. Now it's changed: +// we try to create the smallest number of table files by merging +// updates from multiple logs +TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) { + Options options = CurrentOptions(); + options.write_buffer_size = 5000000; + CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); + + // Since we will reopen DB with smaller write_buffer_size, + // each key will go to new SST file + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + ASSERT_OK(Put(1, Key(10), DummyString(1000000))); + + ASSERT_OK(Put(3, Key(10), DummyString(1))); + // Make 'dobrynia' to be flushed and new WAL file to be created + ASSERT_OK(Put(2, Key(10), DummyString(7500000))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2])); + { + auto tables = ListTableFiles(env_, dbname_); + ASSERT_EQ(tables.size(), static_cast<size_t>(1)); + // Make sure 'dobrynia' was flushed: check sst files amount + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast<uint64_t>(1)); + } + // New WAL file + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(3, Key(10), DummyString(1))); + ASSERT_OK(Put(3, Key(10), DummyString(1))); + ASSERT_OK(Put(3, Key(10), DummyString(1))); + + options.write_buffer_size = 4096; + options.arena_block_size = 4096; + ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, + options); + { + // No inserts => default is empty + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast<uint64_t>(0)); + // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast<uint64_t>(5)); + // 1 SST for big key + 1 SST for small one + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast<uint64_t>(2)); + // 1 SST for all keys + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast<uint64_t>(1)); + } +} + +// In https://reviews.facebook.net/D20661 we change +// recovery behavior: previously for each log file each column family +// memtable was flushed, even it wasn't empty. Now it's changed: +// we try to create the smallest number of table files by merging +// updates from multiple logs +TEST_F(DBWALTest, RecoverCheckFileAmount) { + Options options = CurrentOptions(); + options.write_buffer_size = 100000; + options.arena_block_size = 4 * 1024; + options.avoid_flush_during_recovery = false; + CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); + + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + + // Make 'nikitich' memtable to be flushed + ASSERT_OK(Put(3, Key(10), DummyString(1002400))); + ASSERT_OK(Put(3, Key(1), DummyString(1))); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3])); + // 4 memtable are not flushed, 1 sst file + { + auto tables = ListTableFiles(env_, dbname_); + ASSERT_EQ(tables.size(), static_cast<size_t>(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast<uint64_t>(1)); + } + // Memtable for 'nikitich' has flushed, new WAL file has opened + // 4 memtable still not flushed + + // Write to new WAL file + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + + // Fill up 'nikitich' one more time + ASSERT_OK(Put(3, Key(10), DummyString(1002400))); + // make it flush + ASSERT_OK(Put(3, Key(1), DummyString(1))); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3])); + // There are still 4 memtable not flushed, and 2 sst tables + ASSERT_OK(Put(0, Key(1), DummyString(1))); + ASSERT_OK(Put(1, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1))); + + { + auto tables = ListTableFiles(env_, dbname_); + ASSERT_EQ(tables.size(), static_cast<size_t>(2)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast<uint64_t>(2)); + } + + ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, + options); + { + std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_); + // Check, that records for 'default', 'dobrynia' and 'pikachu' from + // first, second and third WALs went to the same SST. + // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for + // 'dobrynia', one for 'pikachu' + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast<uint64_t>(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast<uint64_t>(3)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast<uint64_t>(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast<uint64_t>(1)); + } +} + +TEST_F(DBWALTest, SyncMultipleLogs) { + const uint64_t kNumBatches = 2; + const int kBatchSize = 1000; + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.write_buffer_size = 4096; + Reopen(options); + + WriteBatch batch; + WriteOptions wo; + wo.sync = true; + + for (uint64_t b = 0; b < kNumBatches; b++) { + batch.Clear(); + for (int i = 0; i < kBatchSize; i++) { + ASSERT_OK(batch.Put(Key(i), DummyString(128))); + } + + ASSERT_OK(dbfull()->Write(wo, &batch)); + } + + ASSERT_OK(dbfull()->SyncWAL()); +} + +// Github issue 1339. Prior the fix we read sequence id from the first log to +// a local variable, then keep increase the variable as we replay logs, +// ignoring actual sequence id of the records. This is incorrect if some writes +// come with WAL disabled. +TEST_F(DBWALTest, PartOfWritesWithWALDisabled) { + std::unique_ptr<FaultInjectionTestEnv> fault_env( + new FaultInjectionTestEnv(env_)); + Options options = CurrentOptions(); + options.env = fault_env.get(); + options.disable_auto_compactions = true; + WriteOptions wal_on, wal_off; + wal_on.sync = true; + wal_on.disableWAL = false; + wal_off.disableWAL = true; + CreateAndReopenWithCF({"dummy"}, options); + ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1 + ASSERT_OK(Put(1, "dummy", "d2", wal_off)); + ASSERT_OK(Put(1, "dummy", "d3", wal_off)); + ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4 + ASSERT_OK(Flush(0)); + ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5 + ASSERT_EQ("v5", Get(0, "key")); + ASSERT_OK(dbfull()->FlushWAL(false)); + // Simulate a crash. + fault_env->SetFilesystemActive(false); + Close(); + fault_env->ResetState(); + ReopenWithColumnFamilies({"default", "dummy"}, options); + // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3. + ASSERT_EQ("v5", Get(0, "key")); + // Destroy DB before destruct fault_env. + Destroy(options); +} + +// +// Test WAL recovery for the various modes available +// +class RecoveryTestHelper { + public: + // Number of WAL files to generate + static constexpr int kWALFilesCount = 10; + // Starting number for the WAL file name like 00010.log + static constexpr int kWALFileOffset = 10; + // Keys to be written per WAL file + static constexpr int kKeysPerWALFile = 133; + // Size of the value + static constexpr int kValueSize = 96; + + // Create WAL files with values filled in + static void FillData(DBWALTestBase* test, const Options& options, + const size_t wal_count, size_t* count) { + // Calling internal functions requires sanitized options. + Options sanitized_options = SanitizeOptions(test->dbname_, options); + const ImmutableDBOptions db_options(sanitized_options); + + *count = 0; + + std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0); + FileOptions file_options; + WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); + + std::unique_ptr<VersionSet> versions; + std::unique_ptr<WalManager> wal_manager; + WriteController write_controller; + + versions.reset(new VersionSet( + test->dbname_, &db_options, file_options, table_cache.get(), + &write_buffer_manager, &write_controller, + /*block_cache_tracer=*/nullptr, + /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")); + + wal_manager.reset( + new WalManager(db_options, file_options, /*io_tracer=*/nullptr)); + + std::unique_ptr<log::Writer> current_log_writer; + + for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) { + uint64_t current_log_number = j; + std::string fname = LogFileName(test->dbname_, current_log_number); + std::unique_ptr<WritableFileWriter> file_writer; + ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(), + fname, file_options, &file_writer, + nullptr)); + log::Writer* log_writer = + new log::Writer(std::move(file_writer), current_log_number, + db_options.recycle_log_file_num > 0, false, + db_options.wal_compression); + ASSERT_OK(log_writer->AddCompressionTypeRecord()); + current_log_writer.reset(log_writer); + + WriteBatch batch; + for (int i = 0; i < kKeysPerWALFile; i++) { + std::string key = "key" + std::to_string((*count)++); + std::string value = test->DummyString(kValueSize); + ASSERT_NE(current_log_writer.get(), nullptr); + uint64_t seq = versions->LastSequence() + 1; + batch.Clear(); + ASSERT_OK(batch.Put(key, value)); + WriteBatchInternal::SetSequence(&batch, seq); + ASSERT_OK(current_log_writer->AddRecord( + WriteBatchInternal::Contents(&batch))); + versions->SetLastAllocatedSequence(seq); + versions->SetLastPublishedSequence(seq); + versions->SetLastSequence(seq); + } + } + } + + // Recreate and fill the store with some data + static size_t FillData(DBWALTestBase* test, Options* options) { + options->create_if_missing = true; + test->DestroyAndReopen(*options); + test->Close(); + + size_t count = 0; + FillData(test, *options, kWALFilesCount, &count); + return count; + } + + // Read back all the keys we wrote and return the number of keys found + static size_t GetData(DBWALTestBase* test) { + size_t count = 0; + for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) { + if (test->Get("key" + std::to_string(i)) != "NOT_FOUND") { + ++count; + } + } + return count; + } + + // Manuall corrupt the specified WAL + static void CorruptWAL(DBWALTestBase* test, const Options& options, + const double off, const double len, + const int wal_file_id, const bool trunc = false) { + Env* env = options.env; + std::string fname = LogFileName(test->dbname_, wal_file_id); + uint64_t size; + ASSERT_OK(env->GetFileSize(fname, &size)); + ASSERT_GT(size, 0); +#ifdef OS_WIN + // Windows disk cache behaves differently. When we truncate + // the original content is still in the cache due to the original + // handle is still open. Generally, in Windows, one prohibits + // shared access to files and it is not needed for WAL but we allow + // it to induce corruption at various tests. + test->Close(); +#endif + if (trunc) { + ASSERT_OK( + test::TruncateFile(env, fname, static_cast<uint64_t>(size * off))); + } else { + ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(size * off + 8), + static_cast<int>(size * len), false)); + } + } +}; + +class DBWALTestWithParams : public DBWALTestBase, + public ::testing::WithParamInterface< + std::tuple<bool, int, int, CompressionType>> { + public: + DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {} +}; + +INSTANTIATE_TEST_CASE_P( + Wal, DBWALTestWithParams, + ::testing::Combine(::testing::Bool(), ::testing::Range(0, 4, 1), + ::testing::Range(RecoveryTestHelper::kWALFileOffset, + RecoveryTestHelper::kWALFileOffset + + RecoveryTestHelper::kWALFilesCount, + 1), + ::testing::Values(CompressionType::kNoCompression, + CompressionType::kZSTD))); + +class DBWALTestWithParamsVaryingRecoveryMode + : public DBWALTestBase, + public ::testing::WithParamInterface< + std::tuple<bool, int, int, WALRecoveryMode, CompressionType>> { + public: + DBWALTestWithParamsVaryingRecoveryMode() + : DBWALTestBase("/db_wal_test_with_params_mode") {} +}; + +INSTANTIATE_TEST_CASE_P( + Wal, DBWALTestWithParamsVaryingRecoveryMode, + ::testing::Combine( + ::testing::Bool(), ::testing::Range(0, 4, 1), + ::testing::Range(RecoveryTestHelper::kWALFileOffset, + RecoveryTestHelper::kWALFileOffset + + RecoveryTestHelper::kWALFilesCount, + 1), + ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords, + WALRecoveryMode::kAbsoluteConsistency, + WALRecoveryMode::kPointInTimeRecovery, + WALRecoveryMode::kSkipAnyCorruptedRecords), + ::testing::Values(CompressionType::kNoCompression, + CompressionType::kZSTD))); + +// Test scope: +// - We expect to open the data store when there is incomplete trailing writes +// at the end of any of the logs +// - We do not expect to open the data store for corruption +TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { + bool trunc = std::get<0>(GetParam()); // Corruption style + // Corruption offset position + int corrupt_offset = std::get<1>(GetParam()); + int wal_file_id = std::get<2>(GetParam()); // WAL file + + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, &options); + // test checksum failure or parsing + RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, + /*len%=*/.1, wal_file_id, trunc); + + options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; + if (trunc) { + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + const size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_TRUE(corrupt_offset == 0 || recovered_row_count > 0); + ASSERT_LT(recovered_row_count, row_count); + } else { + ASSERT_NOK(TryReopen(options)); + } +} + +// Test scope: +// We don't expect the data store to be opened if there is any corruption +// (leading, middle or trailing -- incomplete writes or corruption) +TEST_P(DBWALTestWithParams, kAbsoluteConsistency) { + // Verify clean slate behavior + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, &options); + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count); + + bool trunc = std::get<0>(GetParam()); // Corruption style + // Corruption offset position + int corrupt_offset = std::get<1>(GetParam()); + int wal_file_id = std::get<2>(GetParam()); // WAL file + // WAL compression type + CompressionType compression_type = std::get<3>(GetParam()); + options.wal_compression = compression_type; + + if (trunc && corrupt_offset == 0) { + return; + } + + // fill with new date + RecoveryTestHelper::FillData(this, &options); + // corrupt the wal + RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33, + /*len%=*/.1, wal_file_id, trunc); + // verify + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + options.create_if_missing = false; + ASSERT_NOK(TryReopen(options)); +} + +// Test scope: +// We don't expect the data store to be opened if there is any inconsistency +// between WAL and SST files +TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + + // Create DB with multiple column families. + CreateAndReopenWithCF({"one", "two"}, options); + ASSERT_OK(Put(1, "key1", "val1")); + ASSERT_OK(Put(2, "key2", "val2")); + + // Record the offset at this point + Env* env = options.env; + uint64_t wal_file_id = dbfull()->TEST_LogfileNumber(); + std::string fname = LogFileName(dbname_, wal_file_id); + uint64_t offset_to_corrupt; + ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); + ASSERT_GT(offset_to_corrupt, 0); + + ASSERT_OK(Put(1, "key3", "val3")); + // Corrupt WAL at location of key3 + ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt), + 4, false)); + ASSERT_OK(Put(2, "key4", "val4")); + ASSERT_OK(Put(1, "key5", "val5")); + ASSERT_OK(Flush(2)); + + // PIT recovery & verify + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options)); +} + +TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { + Options options = CurrentOptions(); + options.env = env_; + options.track_and_verify_wals_in_manifest = true; + // The following make sure there are two bg flush threads. + options.max_background_jobs = 8; + + DestroyAndReopen(options); + + const std::string cf1_name("cf1"); + CreateAndReopenWithCF({cf1_name}, options); + assert(handles_.size() == 2); + + { + dbfull()->TEST_LockMutex(); + ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes); + dbfull()->TEST_UnlockMutex(); + } + + ASSERT_OK(dbfull()->PauseBackgroundWork()); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value")); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + + ASSERT_OK(dbfull()->TEST_FlushMemTable( + /*wait=*/false, /*allow_write_stall=*/true, handles_[1])); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + + ASSERT_OK(dbfull()->TEST_FlushMemTable( + /*wait=*/false, /*allow_write_stall=*/true, handles_[0])); + + bool called = false; + std::atomic<int> bg_flush_threads{0}; + std::atomic<bool> wal_synced{false}; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) { + int cur = bg_flush_threads.load(); + int desired = cur + 1; + if (cur > 0 || + !bg_flush_threads.compare_exchange_strong(cur, desired)) { + while (!wal_synced.load()) { + // Wait until the other bg flush thread finishes committing WAL sync + // operation to the MANIFEST. + } + } + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushMemTableToOutputFile:CommitWal:1", + [&](void* /*arg*/) { wal_synced.store(true); }); + // This callback will be called when the first bg flush thread reaches the + // point before entering the MANIFEST write queue after flushing the SST + // file. + // The purpose of the sync points here is to ensure both bg flush threads + // finish computing `min_wal_number_to_keep` before any of them updates the + // `log_number` for the column family that's being flushed. + SyncPoint::GetInstance()->SetCallBack( + "MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep", + [&](void* /*arg*/) { + dbfull()->mutex()->AssertHeld(); + if (!called) { + // We are the first bg flush thread in the MANIFEST write queue. + // We set up the dependency between sync points for two threads that + // will be executing the same code. + // For the interleaving of events, see + // https://github.com/facebook/rocksdb/pull/9715. + // bg flush thread1 will release the db mutex while in the MANIFEST + // write queue. In the meantime, bg flush thread2 locks db mutex and + // computes the min_wal_number_to_keep (before thread1 writes to + // MANIFEST thus before cf1->log_number is updated). Bg thread2 joins + // the MANIFEST write queue afterwards and bg flush thread1 proceeds + // with writing to MANIFEST. + called = true; + SyncPoint::GetInstance()->LoadDependency({ + {"VersionSet::LogAndApply:WriteManifestStart", + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"}, + {"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2", + "VersionSet::LogAndApply:WriteManifest"}, + }); + } else { + // The other bg flush thread has already been in the MANIFEST write + // queue, and we are after. + TEST_SYNC_POINT( + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0])); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); + + ASSERT_TRUE(called); + + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + DB* db1 = nullptr; + Status s = DB::OpenForReadOnly(options, dbname_, &db1); + ASSERT_OK(s); + assert(db1); + delete db1; +} + +// Test scope: +// - We expect to open data store under all circumstances +// - We expect only data upto the point where the first error was encountered +TEST_P(DBWALTestWithParams, kPointInTimeRecovery) { + const int maxkeys = + RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile; + + bool trunc = std::get<0>(GetParam()); // Corruption style + // Corruption offset position + int corrupt_offset = std::get<1>(GetParam()); + int wal_file_id = std::get<2>(GetParam()); // WAL file + // WAL compression type + CompressionType compression_type = std::get<3>(GetParam()); + + // Fill data for testing + Options options = CurrentOptions(); + options.wal_compression = compression_type; + const size_t row_count = RecoveryTestHelper::FillData(this, &options); + + // Corrupt the wal + // The offset here was 0.3 which cuts off right at the end of a + // valid fragment after wal zstd compression checksum is enabled, + // so changed the value to 0.33. + RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33, + /*len%=*/.1, wal_file_id, trunc); + + // Verify + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + + // Probe data for invariants + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); + + // Verify a prefix of keys were recovered. But not in the case of full WAL + // truncation, because we have no way to know there was a corruption when + // truncation happened on record boundaries (preventing recovery holes in + // that case requires using `track_and_verify_wals_in_manifest`). + if (!trunc || corrupt_offset != 0) { + bool expect_data = true; + for (size_t k = 0; k < maxkeys; ++k) { + bool found = Get("key" + std::to_string(k)) != "NOT_FOUND"; + if (expect_data && !found) { + expect_data = false; + } + ASSERT_EQ(found, expect_data); + } + } + + const size_t min = RecoveryTestHelper::kKeysPerWALFile * + (wal_file_id - RecoveryTestHelper::kWALFileOffset); + ASSERT_GE(recovered_row_count, min); + if (!trunc && corrupt_offset != 0) { + const size_t max = RecoveryTestHelper::kKeysPerWALFile * + (wal_file_id - RecoveryTestHelper::kWALFileOffset + 1); + ASSERT_LE(recovered_row_count, max); + } +} + +// Test scope: +// - We expect to open the data store under all scenarios +// - We expect to have recovered records past the corruption zone +TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) { + bool trunc = std::get<0>(GetParam()); // Corruption style + // Corruption offset position + int corrupt_offset = std::get<1>(GetParam()); + int wal_file_id = std::get<2>(GetParam()); // WAL file + // WAL compression type + CompressionType compression_type = std::get<3>(GetParam()); + + // Fill data for testing + Options options = CurrentOptions(); + options.wal_compression = compression_type; + const size_t row_count = RecoveryTestHelper::FillData(this, &options); + + // Corrupt the WAL + RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, + /*len%=*/.1, wal_file_id, trunc); + + // Verify behavior + options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + + // Probe data for invariants + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); + + if (!trunc) { + ASSERT_TRUE(corrupt_offset != 0 || recovered_row_count > 0); + } +} + +TEST_F(DBWALTest, AvoidFlushDuringRecovery) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.avoid_flush_during_recovery = false; + + // Test with flush after recovery. + Reopen(options); + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "v3")); + ASSERT_OK(Put("bar", "v4")); + ASSERT_EQ(1, TotalTableFiles()); + // Reopen DB. Check if WAL logs flushed. + Reopen(options); + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v4", Get("bar")); + ASSERT_EQ(2, TotalTableFiles()); + + // Test without flush after recovery. + options.avoid_flush_during_recovery = true; + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "v5")); + ASSERT_OK(Put("bar", "v6")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "v7")); + ASSERT_OK(Put("bar", "v8")); + ASSERT_EQ(1, TotalTableFiles()); + // Reopen DB. WAL logs should not be flushed this time. + Reopen(options); + ASSERT_EQ("v7", Get("foo")); + ASSERT_EQ("v8", Get("bar")); + ASSERT_EQ(1, TotalTableFiles()); + + // Force flush with allow_2pc. + options.avoid_flush_during_recovery = true; + options.allow_2pc = true; + ASSERT_OK(Put("foo", "v9")); + ASSERT_OK(Put("bar", "v10")); + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "v11")); + ASSERT_OK(Put("bar", "v12")); + Reopen(options); + ASSERT_EQ("v11", Get("foo")); + ASSERT_EQ("v12", Get("bar")); + ASSERT_EQ(3, TotalTableFiles()); +} + +TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) { + // Verifies WAL files that were present during recovery, but not flushed due + // to avoid_flush_during_recovery, will be considered for deletion at a later + // stage. We check at least one such file is deleted during Flush(). + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.avoid_flush_during_recovery = true; + Reopen(options); + + ASSERT_OK(Put("foo", "v1")); + Reopen(options); + for (int i = 0; i < 2; ++i) { + if (i > 0) { + // Flush() triggers deletion of obsolete tracked files + ASSERT_OK(Flush()); + } + VectorLogPtr log_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); + if (i == 0) { + ASSERT_GT(log_files.size(), 0); + } else { + ASSERT_EQ(0, log_files.size()); + } + } +} + +TEST_F(DBWALTest, RecoverWithoutFlush) { + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.create_if_missing = false; + options.disable_auto_compactions = true; + options.write_buffer_size = 64 * 1024 * 1024; + + size_t count = RecoveryTestHelper::FillData(this, &options); + auto validateData = [this, count]() { + for (size_t i = 0; i < count; i++) { + ASSERT_NE(Get("key" + std::to_string(i)), "NOT_FOUND"); + } + }; + Reopen(options); + validateData(); + // Insert some data without flush + ASSERT_OK(Put("foo", "foo_v1")); + ASSERT_OK(Put("bar", "bar_v1")); + Reopen(options); + validateData(); + ASSERT_EQ(Get("foo"), "foo_v1"); + ASSERT_EQ(Get("bar"), "bar_v1"); + // Insert again and reopen + ASSERT_OK(Put("foo", "foo_v2")); + ASSERT_OK(Put("bar", "bar_v2")); + Reopen(options); + validateData(); + ASSERT_EQ(Get("foo"), "foo_v2"); + ASSERT_EQ(Get("bar"), "bar_v2"); + // manual flush and insert again + ASSERT_OK(Flush()); + ASSERT_EQ(Get("foo"), "foo_v2"); + ASSERT_EQ(Get("bar"), "bar_v2"); + ASSERT_OK(Put("foo", "foo_v3")); + ASSERT_OK(Put("bar", "bar_v3")); + Reopen(options); + validateData(); + ASSERT_EQ(Get("foo"), "foo_v3"); + ASSERT_EQ(Get("bar"), "bar_v3"); +} + +TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) { + const std::string kSmallValue = "v"; + const std::string kLargeValue = DummyString(1024); + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.create_if_missing = false; + options.disable_auto_compactions = true; + + auto countWalFiles = [this]() { + VectorLogPtr log_files; + if (!dbfull()->GetSortedWalFiles(log_files).ok()) { + return size_t{0}; + } + return log_files.size(); + }; + + // Create DB with multiple column families and multiple log files. + CreateAndReopenWithCF({"one", "two"}, options); + ASSERT_OK(Put(0, "key1", kSmallValue)); + ASSERT_OK(Put(1, "key2", kLargeValue)); + ASSERT_OK(Flush(1)); + ASSERT_EQ(1, countWalFiles()); + ASSERT_OK(Put(0, "key3", kSmallValue)); + ASSERT_OK(Put(2, "key4", kLargeValue)); + ASSERT_OK(Flush(2)); + ASSERT_EQ(2, countWalFiles()); + + // Reopen, insert and flush. + options.db_write_buffer_size = 64 * 1024 * 1024; + ReopenWithColumnFamilies({"default", "one", "two"}, options); + ASSERT_EQ(Get(0, "key1"), kSmallValue); + ASSERT_EQ(Get(1, "key2"), kLargeValue); + ASSERT_EQ(Get(0, "key3"), kSmallValue); + ASSERT_EQ(Get(2, "key4"), kLargeValue); + // Insert more data. + ASSERT_OK(Put(0, "key5", kLargeValue)); + ASSERT_OK(Put(1, "key6", kLargeValue)); + ASSERT_EQ(3, countWalFiles()); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(2, "key7", kLargeValue)); + ASSERT_OK(dbfull()->FlushWAL(false)); + ASSERT_EQ(4, countWalFiles()); + + // Reopen twice and validate. + for (int i = 0; i < 2; i++) { + ReopenWithColumnFamilies({"default", "one", "two"}, options); + ASSERT_EQ(Get(0, "key1"), kSmallValue); + ASSERT_EQ(Get(1, "key2"), kLargeValue); + ASSERT_EQ(Get(0, "key3"), kSmallValue); + ASSERT_EQ(Get(2, "key4"), kLargeValue); + ASSERT_EQ(Get(0, "key5"), kLargeValue); + ASSERT_EQ(Get(1, "key6"), kLargeValue); + ASSERT_EQ(Get(2, "key7"), kLargeValue); + ASSERT_EQ(4, countWalFiles()); + } +} + +// In this test we are trying to do the following: +// 1. Create a DB with corrupted WAL log; +// 2. Open with avoid_flush_during_recovery = true; +// 3. Append more data without flushing, which creates new WAL log. +// 4. Open again. See if it can correctly handle previous corruption. +TEST_P(DBWALTestWithParamsVaryingRecoveryMode, + RecoverFromCorruptedWALWithoutFlush) { + const int kAppendKeys = 100; + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.create_if_missing = false; + options.disable_auto_compactions = true; + options.write_buffer_size = 64 * 1024 * 1024; + + auto getAll = [this]() { + std::vector<std::pair<std::string, std::string>> data; + ReadOptions ropt; + Iterator* iter = dbfull()->NewIterator(ropt); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + data.push_back( + std::make_pair(iter->key().ToString(), iter->value().ToString())); + } + delete iter; + return data; + }; + + bool trunc = std::get<0>(GetParam()); // Corruption style + // Corruption offset position + int corrupt_offset = std::get<1>(GetParam()); + int wal_file_id = std::get<2>(GetParam()); // WAL file + WALRecoveryMode recovery_mode = std::get<3>(GetParam()); + // WAL compression type + CompressionType compression_type = std::get<4>(GetParam()); + + options.wal_recovery_mode = recovery_mode; + options.wal_compression = compression_type; + // Create corrupted WAL + RecoveryTestHelper::FillData(this, &options); + RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, + /*len%=*/.1, wal_file_id, trunc); + // Skip the test if DB won't open. + if (!TryReopen(options).ok()) { + ASSERT_TRUE(options.wal_recovery_mode == + WALRecoveryMode::kAbsoluteConsistency || + (!trunc && options.wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords)); + return; + } + ASSERT_OK(TryReopen(options)); + // Append some more data. + for (int k = 0; k < kAppendKeys; k++) { + std::string key = "extra_key" + std::to_string(k); + std::string value = DummyString(RecoveryTestHelper::kValueSize); + ASSERT_OK(Put(key, value)); + } + // Save data for comparison. + auto data = getAll(); + // Reopen. Verify data. + ASSERT_OK(TryReopen(options)); + auto actual_data = getAll(); + ASSERT_EQ(data, actual_data); +} + +// Tests that total log size is recovered if we set +// avoid_flush_during_recovery=true. +// Flush should trigger if max_total_wal_size is reached. +TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { + auto test_listener = std::make_shared<FlushCounterListener>(); + test_listener->expected_flush_reason = FlushReason::kWalFull; + + constexpr size_t kKB = 1024; + constexpr size_t kMB = 1024 * 1024; + Options options = CurrentOptions(); + options.avoid_flush_during_recovery = true; + options.max_total_wal_size = 1 * kMB; + options.listeners.push_back(test_listener); + // Have to open DB in multi-CF mode to trigger flush when + // max_total_wal_size is reached. + CreateAndReopenWithCF({"one"}, options); + // Write some keys and we will end up with one log file which is slightly + // smaller than 1MB. + std::string value_100k(100 * kKB, 'v'); + std::string value_300k(300 * kKB, 'v'); + ASSERT_OK(Put(0, "foo", "v1")); + for (int i = 0; i < 9; i++) { + ASSERT_OK(Put(1, "key" + std::to_string(i), value_100k)); + } + // Get log files before reopen. + VectorLogPtr log_files_before; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); + ASSERT_EQ(1, log_files_before.size()); + uint64_t log_size_before = log_files_before[0]->SizeFileBytes(); + ASSERT_GT(log_size_before, 900 * kKB); + ASSERT_LT(log_size_before, 1 * kMB); + ReopenWithColumnFamilies({"default", "one"}, options); + // Write one more value to make log larger than 1MB. + ASSERT_OK(Put(1, "bar", value_300k)); + // Get log files again. A new log file will be opened. + VectorLogPtr log_files_after_reopen; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen)); + ASSERT_EQ(2, log_files_after_reopen.size()); + ASSERT_EQ(log_files_before[0]->LogNumber(), + log_files_after_reopen[0]->LogNumber()); + ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() + + log_files_after_reopen[1]->SizeFileBytes(), + 1 * kMB); + // Write one more key to trigger flush. + ASSERT_OK(Put(0, "foo", "v2")); + for (auto* h : handles_) { + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h)); + } + // Flushed two column families. + ASSERT_EQ(2, test_listener->count.load()); +} + +#if defined(ROCKSDB_PLATFORM_POSIX) +#if defined(ROCKSDB_FALLOCATE_PRESENT) +// Tests that we will truncate the preallocated space of the last log from +// previous. +TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) { + constexpr size_t kKB = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.avoid_flush_during_recovery = true; + if (mem_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem environment"); + return; + } + if (!IsFallocateSupported()) { + return; + } + + DestroyAndReopen(options); + size_t preallocated_size = + dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); + ASSERT_OK(Put("foo", "v1")); + VectorLogPtr log_files_before; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); + ASSERT_EQ(1, log_files_before.size()); + auto& file_before = log_files_before[0]; + ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB); + // The log file has preallocated space. + ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()), + preallocated_size); + Reopen(options); + VectorLogPtr log_files_after; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after)); + ASSERT_EQ(1, log_files_after.size()); + ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB); + // The preallocated space should be truncated. + ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()), + preallocated_size); +} +// Tests that we will truncate the preallocated space of the last log from +// previous. +TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithFlush) { + constexpr size_t kKB = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.avoid_flush_during_recovery = false; + options.avoid_flush_during_shutdown = true; + if (mem_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem environment"); + return; + } + if (!IsFallocateSupported()) { + return; + } + + DestroyAndReopen(options); + size_t preallocated_size = + dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); + ASSERT_OK(Put("foo", "v1")); + VectorLogPtr log_files_before; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); + ASSERT_EQ(1, log_files_before.size()); + auto& file_before = log_files_before[0]; + ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB); + ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()), + preallocated_size); + // The log file has preallocated space. + Close(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::PurgeObsoleteFiles:Begin", + "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"}, + {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate", + "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + port::Thread reopen_thread([&]() { Reopen(options); }); + + TEST_SYNC_POINT( + "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"); + // After the flush during Open, the log file should get deleted. However, + // if the process is in a crash loop, the log file may not get + // deleted and thte preallocated space will keep accumulating. So we need + // to ensure it gets trtuncated. + EXPECT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()), + preallocated_size); + TEST_SYNC_POINT( + "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate"); + reopen_thread.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBWALTest, TruncateLastLogAfterRecoverWALEmpty) { + Options options = CurrentOptions(); + options.env = env_; + options.avoid_flush_during_recovery = false; + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem/non-encrypted environment"); + return; + } + if (!IsFallocateSupported()) { + return; + } + + DestroyAndReopen(options); + size_t preallocated_size = + dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); + Close(); + std::vector<std::string> filenames; + std::string last_log; + uint64_t last_log_num = 0; + ASSERT_OK(env_->GetChildren(dbname_, &filenames)); + for (auto fname : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(fname, &number, &type, nullptr)) { + if (type == kWalFile && number > last_log_num) { + last_log = fname; + } + } + } + ASSERT_NE(last_log, ""); + last_log = dbname_ + '/' + last_log; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::PurgeObsoleteFiles:Begin", + "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"}, + {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate", + "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixWritableFile::Close", + [](void* arg) { *(reinterpret_cast<size_t*>(arg)) = 0; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + // Preallocate space for the empty log file. This could happen if WAL data + // was buffered in memory and the process crashed. + std::unique_ptr<WritableFile> log_file; + ASSERT_OK(env_->ReopenWritableFile(last_log, &log_file, EnvOptions())); + log_file->SetPreallocationBlockSize(preallocated_size); + log_file->PrepareWrite(0, 4096); + log_file.reset(); + + ASSERT_GE(GetAllocatedFileSize(last_log), preallocated_size); + + port::Thread reopen_thread([&]() { Reopen(options); }); + + TEST_SYNC_POINT( + "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"); + // The preallocated space should be truncated. + EXPECT_LT(GetAllocatedFileSize(last_log), preallocated_size); + TEST_SYNC_POINT( + "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate"); + reopen_thread.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBWALTest, ReadOnlyRecoveryNoTruncate) { + constexpr size_t kKB = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.avoid_flush_during_recovery = true; + if (mem_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem environment"); + return; + } + if (!IsFallocateSupported()) { + return; + } + + // create DB and close with file truncate disabled + std::atomic_bool enable_truncate{false}; + + SyncPoint::GetInstance()->SetCallBack( + "PosixWritableFile::Close", [&](void* arg) { + if (!enable_truncate) { + *(reinterpret_cast<size_t*>(arg)) = 0; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + size_t preallocated_size = + dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); + ASSERT_OK(Put("foo", "v1")); + VectorLogPtr log_files_before; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); + ASSERT_EQ(1, log_files_before.size()); + auto& file_before = log_files_before[0]; + ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB); + // The log file has preallocated space. + auto db_size = GetAllocatedFileSize(dbname_ + file_before->PathName()); + ASSERT_GE(db_size, preallocated_size); + Close(); + + // enable truncate and open DB as readonly, the file should not be truncated + // and DB size is not changed. + enable_truncate = true; + ASSERT_OK(ReadOnlyReopen(options)); + VectorLogPtr log_files_after; + ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after)); + ASSERT_EQ(1, log_files_after.size()); + ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB); + ASSERT_EQ(log_files_after[0]->PathName(), file_before->PathName()); + // The preallocated space should NOT be truncated. + // the DB size is almost the same. + ASSERT_NEAR(GetAllocatedFileSize(dbname_ + file_before->PathName()), db_size, + db_size / 100); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} +#endif // ROCKSDB_FALLOCATE_PRESENT +#endif // ROCKSDB_PLATFORM_POSIX + +TEST_F(DBWALTest, WalInManifestButNotInSortedWals) { + Options options = CurrentOptions(); + options.track_and_verify_wals_in_manifest = true; + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + + // Build a way to make wal files selectively go missing + bool wals_go_missing = false; + struct MissingWalFs : public FileSystemWrapper { + MissingWalFs(const std::shared_ptr<FileSystem>& t, + bool* _wals_go_missing_flag) + : FileSystemWrapper(t), wals_go_missing_flag(_wals_go_missing_flag) {} + bool* wals_go_missing_flag; + IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts, + std::vector<std::string>* r, + IODebugContext* dbg) override { + IOStatus s = target_->GetChildren(dir, io_opts, r, dbg); + if (s.ok() && *wals_go_missing_flag) { + for (size_t i = 0; i < r->size();) { + if (EndsWith(r->at(i), ".log")) { + r->erase(r->begin() + i); + } else { + ++i; + } + } + } + return s; + } + const char* Name() const override { return "MissingWalFs"; } + }; + auto my_fs = + std::make_shared<MissingWalFs>(env_->GetFileSystem(), &wals_go_missing); + std::unique_ptr<Env> my_env(NewCompositeEnv(my_fs)); + options.env = my_env.get(); + + CreateAndReopenWithCF({"blah"}, options); + + // Currently necessary to get a WAL tracked in manifest; see + // https://github.com/facebook/rocksdb/issues/10080 + ASSERT_OK(Put(0, "x", "y")); + ASSERT_OK(db_->SyncWAL()); + ASSERT_OK(Put(1, "x", "y")); + ASSERT_OK(db_->SyncWAL()); + ASSERT_OK(Flush(1)); + + ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty()); + std::vector<std::unique_ptr<LogFile>> wals; + ASSERT_OK(db_->GetSortedWalFiles(wals)); + wals_go_missing = true; + ASSERT_NOK(db_->GetSortedWalFiles(wals)); + wals_go_missing = false; + Close(); +} + +#endif // ROCKSDB_LITE + +TEST_F(DBWALTest, WalTermTest) { + Options options = CurrentOptions(); + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "foo", "bar")); + + WriteOptions wo; + wo.sync = true; + wo.disableWAL = false; + + WriteBatch batch; + ASSERT_OK(batch.Put("foo", "bar")); + batch.MarkWalTerminationPoint(); + ASSERT_OK(batch.Put("foo2", "bar2")); + + ASSERT_OK(dbfull()->Write(wo, &batch)); + + // make sure we can re-open it. + ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); + ASSERT_EQ("bar", Get(1, "foo")); + ASSERT_EQ("NOT_FOUND", Get(1, "foo2")); +} + +#ifndef ROCKSDB_LITE +TEST_F(DBWALTest, GetCompressedWalsAfterSync) { + if (db_->GetOptions().wal_compression == kNoCompression) { + ROCKSDB_GTEST_BYPASS("stream compression not present"); + return; + } + Options options = GetDefaultOptions(); + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.create_if_missing = true; + options.env = env_; + options.avoid_flush_during_recovery = true; + options.track_and_verify_wals_in_manifest = true; + // Enable WAL compression so that the newly-created WAL will be non-empty + // after DB open, even if point-in-time WAL recovery encounters no + // corruption. + options.wal_compression = kZSTD; + DestroyAndReopen(options); + + // Write something to memtable and WAL so that log_empty_ will be false after + // next DB::Open(). + ASSERT_OK(Put("a", "v")); + + Reopen(options); + + // New WAL is created, thanks to !log_empty_. + ASSERT_OK(dbfull()->TEST_SwitchWAL()); + + ASSERT_OK(Put("b", "v")); + + ASSERT_OK(db_->SyncWAL()); + + VectorLogPtr wals; + Status s = dbfull()->GetSortedWalFiles(wals); + ASSERT_OK(s); +} +#endif // ROCKSDB_LITE +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |