diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/util/file_reader_writer_test.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/util/file_reader_writer_test.cc')
-rw-r--r-- | src/rocksdb/util/file_reader_writer_test.cc | 1066 |
1 files changed, 1066 insertions, 0 deletions
diff --git a/src/rocksdb/util/file_reader_writer_test.cc b/src/rocksdb/util/file_reader_writer_test.cc new file mode 100644 index 000000000..e778efc3c --- /dev/null +++ b/src/rocksdb/util/file_reader_writer_test.cc @@ -0,0 +1,1066 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include <algorithm> +#include <vector> + +#include "db/db_test_util.h" +#include "env/mock_env.h" +#include "file/line_file_reader.h" +#include "file/random_access_file_reader.h" +#include "file/read_write_util.h" +#include "file/readahead_raf.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" +#include "rocksdb/file_system.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "util/crc32c.h" +#include "util/random.h" +#include "utilities/fault_injection_fs.h" + +namespace ROCKSDB_NAMESPACE { + +class WritableFileWriterTest : public testing::Test {}; + +constexpr uint32_t kMb = static_cast<uint32_t>(1) << 20; + +TEST_F(WritableFileWriterTest, RangeSync) { + class FakeWF : public FSWritableFile { + public: + explicit FakeWF() : size_(0), last_synced_(0) {} + ~FakeWF() override {} + + using FSWritableFile::Append; + IOStatus Append(const Slice& data, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + size_ += data.size(); + return IOStatus::OK(); + } + IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + EXPECT_GE(size_, last_synced_ + kMb); + EXPECT_LT(size_, last_synced_ + 2 * kMb); + // Make sure random writes generated enough writes. + EXPECT_GT(size_, 10 * kMb); + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + void SetIOPriority(Env::IOPriority /*pri*/) override {} + uint64_t GetFileSize(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return size_; + } + void GetPreallocationStatus(size_t* /*block_size*/, + size_t* /*last_allocated_block*/) override {} + size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { + return 0; + } + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::OK(); + } + + protected: + IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus RangeSync(uint64_t offset, uint64_t nbytes, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(offset % 4096, 0u); + EXPECT_EQ(nbytes % 4096, 0u); + + EXPECT_EQ(offset, last_synced_); + last_synced_ = offset + nbytes; + EXPECT_GE(size_, last_synced_ + kMb); + if (size_ > 2 * kMb) { + EXPECT_LT(size_, last_synced_ + 2 * kMb); + } + return IOStatus::OK(); + } + + uint64_t size_; + uint64_t last_synced_; + }; + + EnvOptions env_options; + env_options.bytes_per_sync = kMb; + std::unique_ptr<FakeWF> wf(new FakeWF); + std::unique_ptr<WritableFileWriter> writer( + new WritableFileWriter(std::move(wf), "" /* don't care */, env_options)); + Random r(301); + Status s; + std::unique_ptr<char[]> large_buf(new char[10 * kMb]); + for (int i = 0; i < 1000; i++) { + int skew_limit = (i < 700) ? 10 : 15; + uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100); + s = writer->Append(Slice(large_buf.get(), num)); + ASSERT_OK(s); + + // Flush in a chance of 1/10. + if (r.Uniform(10) == 0) { + s = writer->Flush(); + ASSERT_OK(s); + } + } + s = writer->Close(); + ASSERT_OK(s); +} + +TEST_F(WritableFileWriterTest, IncrementalBuffer) { + class FakeWF : public FSWritableFile { + public: + explicit FakeWF(std::string* _file_data, bool _use_direct_io, + bool _no_flush) + : file_data_(_file_data), + use_direct_io_(_use_direct_io), + no_flush_(_no_flush) {} + ~FakeWF() override {} + + using FSWritableFile::Append; + IOStatus Append(const Slice& data, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + file_data_->append(data.data(), data.size()); + size_ += data.size(); + return IOStatus::OK(); + } + using FSWritableFile::PositionedAppend; + IOStatus PositionedAppend(const Slice& data, uint64_t pos, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + EXPECT_TRUE(pos % 512 == 0); + EXPECT_TRUE(data.size() % 512 == 0); + file_data_->resize(pos); + file_data_->append(data.data(), data.size()); + size_ += data.size(); + return IOStatus::OK(); + } + + IOStatus Truncate(uint64_t size, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + file_data_->resize(size); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + void SetIOPriority(Env::IOPriority /*pri*/) override {} + uint64_t GetFileSize(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return size_; + } + void GetPreallocationStatus(size_t* /*block_size*/, + size_t* /*last_allocated_block*/) override {} + size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { + return 0; + } + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::OK(); + } + bool use_direct_io() const override { return use_direct_io_; } + + std::string* file_data_; + bool use_direct_io_; + bool no_flush_; + size_t size_ = 0; + }; + + Random r(301); + const int kNumAttempts = 50; + for (int attempt = 0; attempt < kNumAttempts; attempt++) { + bool no_flush = (attempt % 3 == 0); + EnvOptions env_options; + env_options.writable_file_max_buffer_size = + (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024; + std::string actual; + std::unique_ptr<FakeWF> wf(new FakeWF(&actual, +#ifndef ROCKSDB_LITE + attempt % 2 == 1, +#else + false, +#endif + no_flush)); + std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter( + std::move(wf), "" /* don't care */, env_options)); + + std::string target; + for (int i = 0; i < 20; i++) { + uint32_t num = r.Skewed(16) * 100 + r.Uniform(100); + std::string random_string = r.RandomString(num); + ASSERT_OK(writer->Append(Slice(random_string.c_str(), num))); + target.append(random_string.c_str(), num); + + // In some attempts, flush in a chance of 1/10. + if (!no_flush && r.Uniform(10) == 0) { + ASSERT_OK(writer->Flush()); + } + } + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Close()); + ASSERT_EQ(target.size(), actual.size()); + ASSERT_EQ(target, actual); + } +} + +TEST_F(WritableFileWriterTest, BufferWithZeroCapacityDirectIO) { + EnvOptions env_opts; + env_opts.use_direct_writes = true; + env_opts.writable_file_max_buffer_size = 0; + { + std::unique_ptr<WritableFileWriter> writer; + const Status s = + WritableFileWriter::Create(FileSystem::Default(), /*fname=*/"dont_care", + FileOptions(env_opts), &writer, + /*dbg=*/nullptr); + ASSERT_TRUE(s.IsInvalidArgument()); + } +} + +class DBWritableFileWriterTest : public DBTestBase { + public: + DBWritableFileWriterTest() + : DBTestBase("db_secondary_cache_test", /*env_do_fsync=*/true) { + fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem())); + fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_)); + } + + std::shared_ptr<FaultInjectionTestFS> fault_fs_; + std::unique_ptr<Env> fault_env_; +}; + +TEST_F(DBWritableFileWriterTest, AppendWithChecksum) { + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = dbname_ + "/test_file"; + std::unique_ptr<FSWritableFile> writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr<TestFSWritableFile> file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr<WritableFileWriter> file_writer; + ImmutableOptions ioptions(options); + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + Random rnd(301); + std::string data = rnd.RandomString(1000); + uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size()); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + ASSERT_OK(file_writer->Flush()); + Random size_r(47); + for (int i = 0; i < 2000; i++) { + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + Destroy(options); +} + +TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) { + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = dbname_ + "/test_file"; + std::unique_ptr<FSWritableFile> writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr<TestFSWritableFile> file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr<WritableFileWriter> file_writer; + ImmutableOptions ioptions(options); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, false)); + + Random rnd(301); + std::string data = rnd.RandomString(1000); + uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size()); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + ASSERT_OK(file_writer->Flush()); + Random size_r(47); + for (int i = 0; i < 1000; i++) { + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + Destroy(options); +} + +TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) { + FileOptions file_options = FileOptions(); + file_options.rate_limiter = nullptr; + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = dbname_ + "/test_file"; + std::unique_ptr<FSWritableFile> writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr<TestFSWritableFile> file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr<WritableFileWriter> file_writer; + ImmutableOptions ioptions(options); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + Random rnd(301); + std::string data; + uint32_t data_crc32c; + uint64_t start = fault_env_->NowMicros(); + Random size_r(47); + uint64_t bytes_written = 0; + for (int i = 0; i < 100; i++) { + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + bytes_written += static_cast<uint64_t>(data.size()); + + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + bytes_written += static_cast<uint64_t>(data.size()); + } + uint64_t elapsed = fault_env_->NowMicros() - start; + double raw_rate = bytes_written * 1000000.0 / elapsed; + ASSERT_OK(file_writer->Close()); + + // Set the rate-limiter + FileOptions file_options1 = FileOptions(); + file_options1.rate_limiter = + NewGenericRateLimiter(static_cast<int64_t>(0.5 * raw_rate)); + fname = dbname_ + "/test_file_1"; + std::unique_ptr<FSWritableFile> writable_file_ptr1; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1, + &writable_file_ptr1, + /*dbg*/ nullptr)); + file.reset(new TestFSWritableFile( + fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get())); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options1, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + for (int i = 0; i < 1000; i++) { + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast<int>(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + if (file_options1.rate_limiter != nullptr) { + delete file_options1.rate_limiter; + } + + Destroy(options); +} + +#ifndef ROCKSDB_LITE +TEST_F(WritableFileWriterTest, AppendStatusReturn) { + class FakeWF : public FSWritableFile { + public: + explicit FakeWF() : use_direct_io_(false), io_error_(false) {} + + bool use_direct_io() const override { return use_direct_io_; } + + using FSWritableFile::Append; + IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + using FSWritableFile::PositionedAppend; + IOStatus PositionedAppend(const Slice& /*data*/, uint64_t, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + void Setuse_direct_io(bool val) { use_direct_io_ = val; } + void SetIOError(bool val) { io_error_ = val; } + + protected: + bool use_direct_io_; + bool io_error_; + }; + std::unique_ptr<FakeWF> wf(new FakeWF()); + wf->Setuse_direct_io(true); + std::unique_ptr<WritableFileWriter> writer( + new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions())); + + ASSERT_OK(writer->Append(std::string(2 * kMb, 'a'))); + + // Next call to WritableFile::Append() should fail + FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file()); + fwf->SetIOError(true); + ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b'))); +} +#endif + +class ReadaheadRandomAccessFileTest + : public testing::Test, + public testing::WithParamInterface<size_t> { + public: + static std::vector<size_t> GetReadaheadSizeList() { + return {1lu << 12, 1lu << 16}; + } + void SetUp() override { + readahead_size_ = GetParam(); + scratch_.reset(new char[2 * readahead_size_]); + ResetSourceStr(); + } + ReadaheadRandomAccessFileTest() : control_contents_() {} + std::string Read(uint64_t offset, size_t n) { + Slice result; + Status s = test_read_holder_->Read(offset, n, IOOptions(), &result, + scratch_.get(), nullptr); + EXPECT_TRUE(s.ok() || s.IsInvalidArgument()); + return std::string(result.data(), result.size()); + } + void ResetSourceStr(const std::string& str = "") { + std::unique_ptr<FSWritableFile> sink( + new test::StringSink(&control_contents_)); + std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); + Status s = write_holder->Append(Slice(str)); + EXPECT_OK(s); + s = write_holder->Flush(); + EXPECT_OK(s); + std::unique_ptr<FSRandomAccessFile> read_holder( + new test::StringSource(control_contents_)); + test_read_holder_ = + NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_); + } + size_t GetReadaheadSize() const { return readahead_size_; } + + private: + size_t readahead_size_; + Slice control_contents_; + std::unique_ptr<FSRandomAccessFile> test_read_holder_; + std::unique_ptr<char[]> scratch_; +}; + +TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) { + ASSERT_EQ("", Read(0, 1)); + ASSERT_EQ("", Read(0, 0)); + ASSERT_EQ("", Read(13, 13)); +} + +TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) { + std::string str = "abcdefghijklmnopqrs"; + ResetSourceStr(str); + ASSERT_EQ(str.substr(3, 4), Read(3, 4)); + ASSERT_EQ(str.substr(0, 3), Read(0, 3)); + ASSERT_EQ(str, Read(0, str.size())); + ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)), + Read(7, 30)); + ASSERT_EQ("", Read(100, 100)); +} + +TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) { + Random rng(42); + for (int k = 0; k < 100; ++k) { + size_t strLen = k * GetReadaheadSize() + + rng.Uniform(static_cast<int>(GetReadaheadSize())); + std::string str = rng.HumanReadableString(static_cast<int>(strLen)); + ResetSourceStr(str); + for (int test = 1; test <= 100; ++test) { + size_t offset = rng.Uniform(static_cast<int>(strLen)); + size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize())); + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), + Read(offset, n)); + } + } +} + +TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) { + Random rng(7); + size_t strLen = 4 * GetReadaheadSize() + + rng.Uniform(static_cast<int>(GetReadaheadSize())); + std::string str = rng.HumanReadableString(static_cast<int>(strLen)); + ResetSourceStr(str); + for (int test = 1; test <= 100; ++test) { + size_t offset = rng.Uniform(static_cast<int>(strLen)); + size_t n = + GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize())); + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), + Read(offset, n)); + } +} + +INSTANTIATE_TEST_CASE_P( + EmptySourceStr, ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); + +class ReadaheadSequentialFileTest : public testing::Test, + public testing::WithParamInterface<size_t> { + public: + static std::vector<size_t> GetReadaheadSizeList() { + return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18}; + } + void SetUp() override { + readahead_size_ = GetParam(); + scratch_.reset(new char[2 * readahead_size_]); + ResetSourceStr(); + } + ReadaheadSequentialFileTest() {} + std::string Read(size_t n) { + Slice result; + Status s = test_read_holder_->Read( + n, &result, scratch_.get(), Env::IO_TOTAL /* rate_limiter_priority*/); + EXPECT_TRUE(s.ok() || s.IsInvalidArgument()); + return std::string(result.data(), result.size()); + } + void Skip(size_t n) { test_read_holder_->Skip(n); } + void ResetSourceStr(const std::string& str = "") { + auto read_holder = std::unique_ptr<FSSequentialFile>( + new test::SeqStringSource(str, &seq_read_count_)); + test_read_holder_.reset(new SequentialFileReader(std::move(read_holder), + "test", readahead_size_)); + } + size_t GetReadaheadSize() const { return readahead_size_; } + + private: + size_t readahead_size_; + std::unique_ptr<SequentialFileReader> test_read_holder_; + std::unique_ptr<char[]> scratch_; + std::atomic<int> seq_read_count_; +}; + +TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) { + ASSERT_EQ("", Read(0)); + ASSERT_EQ("", Read(1)); + ASSERT_EQ("", Read(13)); +} + +TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) { + std::string str = "abcdefghijklmnopqrs"; + ResetSourceStr(str); + ASSERT_EQ(str.substr(0, 3), Read(3)); + ASSERT_EQ(str.substr(3, 1), Read(1)); + ASSERT_EQ(str.substr(4), Read(str.size())); + ASSERT_EQ("", Read(100)); +} + +TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) { + Random rng(42); + for (int s = 0; s < 1; ++s) { + for (int k = 0; k < 100; ++k) { + size_t strLen = k * GetReadaheadSize() + + rng.Uniform(static_cast<int>(GetReadaheadSize())); + std::string str = rng.HumanReadableString(static_cast<int>(strLen)); + ResetSourceStr(str); + size_t offset = 0; + for (int test = 1; test <= 100; ++test) { + size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize())); + if (s && test % 2) { + Skip(n); + } else { + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n)); + } + offset = std::min(offset + n, strLen); + } + } + } +} + +TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) { + Random rng(42); + for (int s = 0; s < 1; ++s) { + for (int k = 0; k < 100; ++k) { + size_t strLen = k * GetReadaheadSize() + + rng.Uniform(static_cast<int>(GetReadaheadSize())); + std::string str = rng.HumanReadableString(static_cast<int>(strLen)); + ResetSourceStr(str); + size_t offset = 0; + for (int test = 1; test <= 100; ++test) { + size_t n = GetReadaheadSize() + + rng.Uniform(static_cast<int>(GetReadaheadSize())); + if (s && test % 2) { + Skip(n); + } else { + ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n)); + } + offset = std::min(offset + n, strLen); + } + } + } +} + +INSTANTIATE_TEST_CASE_P( + EmptySourceStr, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + ReadExceedsReadaheadSize, ReadaheadSequentialFileTest, + ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList())); + +namespace { +std::string GenerateLine(int n) { + std::string rv; + // Multiples of 17 characters per line, for likely bad buffer alignment + for (int i = 0; i < n; ++i) { + rv.push_back(static_cast<char>('0' + (i % 10))); + rv.append("xxxxxxxxxxxxxxxx"); + } + return rv; +} +} // namespace + +TEST(LineFileReaderTest, LineFileReaderTest) { + const int nlines = 1000; + + std::unique_ptr<Env> mem_env(MockEnv::Create(Env::Default())); + std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem(); + // Create an input file + { + std::unique_ptr<FSWritableFile> file; + ASSERT_OK( + fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr)); + + for (int i = 0; i < nlines; ++i) { + std::string line = GenerateLine(i); + line.push_back('\n'); + ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr)); + } + } + + // Verify with no I/O errors + { + std::unique_ptr<LineFileReader> reader; + ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, + nullptr /* dbg */, + nullptr /* rate_limiter */)); + std::string line; + int count = 0; + while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { + ASSERT_EQ(line, GenerateLine(count)); + ++count; + ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count); + } + ASSERT_OK(reader->GetStatus()); + ASSERT_EQ(count, nlines); + ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count); + // And still + ASSERT_FALSE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); + ASSERT_OK(reader->GetStatus()); + ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count); + } + + // Verify with injected I/O error + { + std::unique_ptr<LineFileReader> reader; + ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, + nullptr /* dbg */, + nullptr /* rate_limiter */)); + std::string line; + int count = 0; + // Read part way through the file + while (count < nlines / 4) { + ASSERT_TRUE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); + ASSERT_EQ(line, GenerateLine(count)); + ++count; + ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count); + } + ASSERT_OK(reader->GetStatus()); + + // Inject error + int callback_count = 0; + SyncPoint::GetInstance()->SetCallBack( + "MemFile::Read:IOStatus", [&](void* arg) { + IOStatus* status = static_cast<IOStatus*>(arg); + *status = IOStatus::Corruption("test"); + ++callback_count; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { + ASSERT_EQ(line, GenerateLine(count)); + ++count; + ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count); + } + ASSERT_TRUE(reader->GetStatus().IsCorruption()); + ASSERT_LT(count, nlines / 2); + ASSERT_EQ(callback_count, 1); + + // Still get error & no retry + ASSERT_FALSE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); + ASSERT_TRUE(reader->GetStatus().IsCorruption()); + ASSERT_EQ(callback_count, 1); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + } +} + +#ifndef ROCKSDB_LITE +class IOErrorEventListener : public EventListener { + public: + IOErrorEventListener() { notify_error_.store(0); } + + void OnIOError(const IOErrorInfo& io_error_info) override { + notify_error_++; + EXPECT_FALSE(io_error_info.file_path.empty()); + EXPECT_FALSE(io_error_info.io_status.ok()); + } + + size_t NotifyErrorCount() { return notify_error_; } + + bool ShouldBeNotifiedOnFileIO() override { return true; } + + private: + std::atomic<size_t> notify_error_; +}; + +TEST_F(DBWritableFileWriterTest, IOErrorNotification) { + class FakeWF : public FSWritableFile { + public: + explicit FakeWF() : io_error_(false) { + file_append_errors_.store(0); + file_flush_errors_.store(0); + } + + using FSWritableFile::Append; + IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + file_append_errors_++; + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + + using FSWritableFile::PositionedAppend; + IOStatus PositionedAppend(const Slice& /*data*/, uint64_t, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + if (io_error_) { + file_flush_errors_++; + return IOStatus::IOError("Fake IO error"); + } + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + void SetIOError(bool val) { io_error_ = val; } + + void CheckCounters(int file_append_errors, int file_flush_errors) { + ASSERT_EQ(file_append_errors, file_append_errors_); + ASSERT_EQ(file_flush_errors_, file_flush_errors); + } + + protected: + bool io_error_; + std::atomic<size_t> file_append_errors_; + std::atomic<size_t> file_flush_errors_; + }; + + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + IOErrorEventListener* listener = new IOErrorEventListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + ImmutableOptions ioptions(options); + + std::string fname = dbname_ + "/test_file"; + std::unique_ptr<FakeWF> writable_file_ptr(new FakeWF); + + std::unique_ptr<WritableFileWriter> file_writer; + writable_file_ptr->SetIOError(true); + + file_writer.reset(new WritableFileWriter( + std::move(writable_file_ptr), fname, file_options, + SystemClock::Default().get(), nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + FakeWF* fwf = static_cast<FakeWF*>(file_writer->writable_file()); + + fwf->SetIOError(true); + ASSERT_NOK(file_writer->Append(std::string(2 * kMb, 'a'))); + fwf->CheckCounters(1, 0); + ASSERT_EQ(listener->NotifyErrorCount(), 1); + + file_writer->reset_seen_error(); + fwf->SetIOError(true); + ASSERT_NOK(file_writer->Flush()); + fwf->CheckCounters(1, 1); + ASSERT_EQ(listener->NotifyErrorCount(), 2); + + /* No error generation */ + file_writer->reset_seen_error(); + fwf->SetIOError(false); + ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b'))); + ASSERT_EQ(listener->NotifyErrorCount(), 2); + fwf->CheckCounters(1, 1); +} +#endif // ROCKSDB_LITE + +class WritableFileWriterIOPriorityTest : public testing::Test { + protected: + // This test is to check whether the rate limiter priority can be passed + // correctly from WritableFileWriter functions to FSWritableFile functions. + + void SetUp() override { + // When op_rate_limiter_priority parameter in WritableFileWriter functions + // is the default (Env::IO_TOTAL). + std::unique_ptr<FakeWF> wf{new FakeWF(Env::IO_HIGH)}; + FileOptions file_options; + writer_.reset(new WritableFileWriter(std::move(wf), "" /* don't care */, + file_options)); + } + + class FakeWF : public FSWritableFile { + public: + explicit FakeWF(Env::IOPriority io_priority) { SetIOPriority(io_priority); } + ~FakeWF() override {} + + IOStatus Append(const Slice& /*data*/, const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Append(const Slice& data, const IOOptions& options, + const DataVerificationInfo& /* verification_info */, + IODebugContext* dbg) override { + return Append(data, options, dbg); + } + IOStatus PositionedAppend(const Slice& /*data*/, uint64_t /*offset*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus PositionedAppend( + const Slice& /* data */, uint64_t /* offset */, + const IOOptions& options, + const DataVerificationInfo& /* verification_info */, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Truncate(uint64_t /*size*/, const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Close(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Flush(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Sync(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus Fsync(const IOOptions& options, IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + uint64_t GetFileSize(const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return 0; + } + void GetPreallocationStatus(size_t* /*block_size*/, + size_t* /*last_allocated_block*/) override {} + size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { + return 0; + } + IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + return IOStatus::OK(); + } + + IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + return IOStatus::OK(); + } + + void PrepareWrite(size_t /*offset*/, size_t /*len*/, + const IOOptions& options, + IODebugContext* /*dbg*/) override { + EXPECT_EQ(options.rate_limiter_priority, io_priority_); + } + + bool IsSyncThreadSafe() const override { return true; } + }; + + std::unique_ptr<WritableFileWriter> writer_; +}; + +TEST_F(WritableFileWriterIOPriorityTest, Append) { + ASSERT_OK(writer_->Append(Slice("abc"))); +} + +TEST_F(WritableFileWriterIOPriorityTest, Pad) { ASSERT_OK(writer_->Pad(500)); } + +TEST_F(WritableFileWriterIOPriorityTest, Flush) { ASSERT_OK(writer_->Flush()); } + +TEST_F(WritableFileWriterIOPriorityTest, Close) { ASSERT_OK(writer_->Close()); } + +TEST_F(WritableFileWriterIOPriorityTest, Sync) { + ASSERT_OK(writer_->Sync(false)); + ASSERT_OK(writer_->Sync(true)); +} + +TEST_F(WritableFileWriterIOPriorityTest, SyncWithoutFlush) { + ASSERT_OK(writer_->SyncWithoutFlush(false)); + ASSERT_OK(writer_->SyncWithoutFlush(true)); +} + +TEST_F(WritableFileWriterIOPriorityTest, BasicOp) { + EnvOptions env_options; + env_options.bytes_per_sync = kMb; + std::unique_ptr<FakeWF> wf(new FakeWF(Env::IO_HIGH)); + std::unique_ptr<WritableFileWriter> writer( + new WritableFileWriter(std::move(wf), "" /* don't care */, env_options)); + Random r(301); + Status s; + std::unique_ptr<char[]> large_buf(new char[10 * kMb]); + for (int i = 0; i < 1000; i++) { + int skew_limit = (i < 700) ? 10 : 15; + uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100); + s = writer->Append(Slice(large_buf.get(), num)); + ASSERT_OK(s); + + // Flush in a chance of 1/10. + if (r.Uniform(10) == 0) { + s = writer->Flush(); + ASSERT_OK(s); + } + } + s = writer->Close(); + ASSERT_OK(s); +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |