summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/blob/blob_file_reader_test.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/blob/blob_file_reader_test.cc1024
1 files changed, 1024 insertions, 0 deletions
diff --git a/src/rocksdb/db/blob/blob_file_reader_test.cc b/src/rocksdb/db/blob/blob_file_reader_test.cc
new file mode 100644
index 000000000..03458e2b5
--- /dev/null
+++ b/src/rocksdb/db/blob/blob_file_reader_test.cc
@@ -0,0 +1,1024 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/blob/blob_file_reader.h"
+
+#include <cassert>
+#include <string>
+
+#include "db/blob/blob_contents.h"
+#include "db/blob/blob_log_format.h"
+#include "db/blob/blob_log_writer.h"
+#include "env/mock_env.h"
+#include "file/filename.h"
+#include "file/read_write_util.h"
+#include "file/writable_file_writer.h"
+#include "options/cf_options.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/options.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
+#include "util/compression.h"
+#include "utilities/fault_injection_env.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+
+// Creates a test blob file with `num` blobs in it.
+void WriteBlobFile(const ImmutableOptions& immutable_options,
+ uint32_t column_family_id, bool has_ttl,
+ const ExpirationRange& expiration_range_header,
+ const ExpirationRange& expiration_range_footer,
+ uint64_t blob_file_number, const std::vector<Slice>& keys,
+ const std::vector<Slice>& blobs, CompressionType compression,
+ std::vector<uint64_t>& blob_offsets,
+ std::vector<uint64_t>& blob_sizes) {
+ assert(!immutable_options.cf_paths.empty());
+ size_t num = keys.size();
+ assert(num == blobs.size());
+ assert(num == blob_offsets.size());
+ assert(num == blob_sizes.size());
+
+ const std::string blob_file_path =
+ BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
+ std::unique_ptr<FSWritableFile> file;
+ ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
+ FileOptions()));
+
+ std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+ std::move(file), blob_file_path, FileOptions(), immutable_options.clock));
+
+ constexpr Statistics* statistics = nullptr;
+ constexpr bool use_fsync = false;
+ constexpr bool do_flush = false;
+
+ BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock,
+ statistics, blob_file_number, use_fsync,
+ do_flush);
+
+ BlobLogHeader header(column_family_id, compression, has_ttl,
+ expiration_range_header);
+
+ ASSERT_OK(blob_log_writer.WriteHeader(header));
+
+ std::vector<std::string> compressed_blobs(num);
+ std::vector<Slice> blobs_to_write(num);
+ if (kNoCompression == compression) {
+ for (size_t i = 0; i < num; ++i) {
+ blobs_to_write[i] = blobs[i];
+ blob_sizes[i] = blobs[i].size();
+ }
+ } else {
+ CompressionOptions opts;
+ CompressionContext context(compression);
+ constexpr uint64_t sample_for_compression = 0;
+ CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
+ compression, sample_for_compression);
+
+ constexpr uint32_t compression_format_version = 2;
+
+ for (size_t i = 0; i < num; ++i) {
+ ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version,
+ &compressed_blobs[i]));
+ blobs_to_write[i] = compressed_blobs[i];
+ blob_sizes[i] = compressed_blobs[i].size();
+ }
+ }
+
+ for (size_t i = 0; i < num; ++i) {
+ uint64_t key_offset = 0;
+ ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset,
+ &blob_offsets[i]));
+ }
+
+ BlobLogFooter footer;
+ footer.blob_count = num;
+ footer.expiration_range = expiration_range_footer;
+
+ std::string checksum_method;
+ std::string checksum_value;
+ ASSERT_OK(
+ blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value));
+}
+
+// Creates a test blob file with a single blob in it. Note: this method
+// makes it possible to test various corner cases by allowing the caller
+// to specify the contents of various blob file header/footer fields.
+void WriteBlobFile(const ImmutableOptions& immutable_options,
+ uint32_t column_family_id, bool has_ttl,
+ const ExpirationRange& expiration_range_header,
+ const ExpirationRange& expiration_range_footer,
+ uint64_t blob_file_number, const Slice& key,
+ const Slice& blob, CompressionType compression,
+ uint64_t* blob_offset, uint64_t* blob_size) {
+ std::vector<Slice> keys{key};
+ std::vector<Slice> blobs{blob};
+ std::vector<uint64_t> blob_offsets{0};
+ std::vector<uint64_t> blob_sizes{0};
+ WriteBlobFile(immutable_options, column_family_id, has_ttl,
+ expiration_range_header, expiration_range_footer,
+ blob_file_number, keys, blobs, compression, blob_offsets,
+ blob_sizes);
+ if (blob_offset) {
+ *blob_offset = blob_offsets[0];
+ }
+ if (blob_size) {
+ *blob_size = blob_sizes[0];
+ }
+}
+
+} // anonymous namespace
+
+class BlobFileReaderTest : public testing::Test {
+ protected:
+ BlobFileReaderTest() { mock_env_.reset(MockEnv::Create(Env::Default())); }
+ std::unique_ptr<Env> mock_env_;
+};
+
+TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(),
+ "BlobFileReaderTest_CreateReaderAndGetBlob"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr size_t num_blobs = 3;
+ const std::vector<std::string> key_strs = {"key1", "key2", "key3"};
+ const std::vector<std::string> blob_strs = {"blob1", "blob2", "blob3"};
+
+ const std::vector<Slice> keys = {key_strs[0], key_strs[1], key_strs[2]};
+ const std::vector<Slice> blobs = {blob_strs[0], blob_strs[1], blob_strs[2]};
+
+ std::vector<uint64_t> blob_offsets(keys.size());
+ std::vector<uint64_t> blob_sizes(keys.size());
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, keys, blobs, kNoCompression,
+ blob_offsets, blob_sizes);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_OK(BlobFileReader::Create(
+ immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/, &reader));
+
+ // Make sure the blob can be retrieved with and without checksum verification
+ ReadOptions read_options;
+ read_options.verify_checksums = false;
+
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr MemoryAllocator* allocator = nullptr;
+
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0],
+ blob_sizes[0], kNoCompression, prefetch_buffer,
+ allocator, &value, &bytes_read));
+ ASSERT_NE(value, nullptr);
+ ASSERT_EQ(value->data(), blobs[0]);
+ ASSERT_EQ(bytes_read, blob_sizes[0]);
+
+ // MultiGetBlob
+ bytes_read = 0;
+ size_t total_size = 0;
+
+ std::array<Status, num_blobs> statuses_buf;
+ std::array<BlobReadRequest, num_blobs> requests_buf;
+ autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
+ blob_reqs;
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ requests_buf[i] =
+ BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i],
+ kNoCompression, nullptr, &statuses_buf[i]);
+ blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
+ }
+
+ reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ const auto& result = blob_reqs[i].second;
+
+ ASSERT_OK(statuses_buf[i]);
+ ASSERT_NE(result, nullptr);
+ ASSERT_EQ(result->data(), blobs[i]);
+ total_size += blob_sizes[i];
+ }
+ ASSERT_EQ(bytes_read, total_size);
+ }
+
+ read_options.verify_checksums = true;
+
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1],
+ blob_sizes[1], kNoCompression, prefetch_buffer,
+ allocator, &value, &bytes_read));
+ ASSERT_NE(value, nullptr);
+ ASSERT_EQ(value->data(), blobs[1]);
+
+ const uint64_t key_size = keys[1].size();
+ ASSERT_EQ(bytes_read,
+ BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
+ blob_sizes[1]);
+ }
+
+ // Invalid offset (too close to start of file)
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(read_options, keys[0], blob_offsets[0] - 1,
+ blob_sizes[0], kNoCompression, prefetch_buffer,
+ allocator, &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+ }
+
+ // Invalid offset (too close to end of file)
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(read_options, keys[2], blob_offsets[2] + 1,
+ blob_sizes[2], kNoCompression, prefetch_buffer,
+ allocator, &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+ }
+
+ // Incorrect compression type
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(read_options, keys[0], blob_offsets[0],
+ blob_sizes[0], kZSTD, prefetch_buffer, allocator,
+ &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+ }
+
+ // Incorrect key size
+ {
+ constexpr char shorter_key[] = "k";
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(read_options, shorter_key,
+ blob_offsets[0] -
+ (keys[0].size() - sizeof(shorter_key) + 1),
+ blob_sizes[0], kNoCompression, prefetch_buffer,
+ allocator, &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+
+ // MultiGetBlob
+ autovector<std::reference_wrapper<const Slice>> key_refs;
+ for (const auto& key_ref : keys) {
+ key_refs.emplace_back(std::cref(key_ref));
+ }
+ Slice shorter_key_slice(shorter_key, sizeof(shorter_key) - 1);
+ key_refs[1] = std::cref(shorter_key_slice);
+
+ autovector<uint64_t> offsets{
+ blob_offsets[0],
+ blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()),
+ blob_offsets[2]};
+
+ std::array<Status, num_blobs> statuses_buf;
+ std::array<BlobReadRequest, num_blobs> requests_buf;
+ autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
+ blob_reqs;
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ requests_buf[i] =
+ BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i],
+ kNoCompression, nullptr, &statuses_buf[i]);
+ blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
+ }
+
+ reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ if (i == 1) {
+ ASSERT_TRUE(statuses_buf[i].IsCorruption());
+ } else {
+ ASSERT_OK(statuses_buf[i]);
+ }
+ }
+ }
+
+ // Incorrect key
+ {
+ constexpr char incorrect_key[] = "foo1";
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(read_options, incorrect_key, blob_offsets[0],
+ blob_sizes[0], kNoCompression, prefetch_buffer,
+ allocator, &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+
+ // MultiGetBlob
+ autovector<std::reference_wrapper<const Slice>> key_refs;
+ for (const auto& key_ref : keys) {
+ key_refs.emplace_back(std::cref(key_ref));
+ }
+ Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1);
+ key_refs[2] = std::cref(wrong_key_slice);
+
+ std::array<Status, num_blobs> statuses_buf;
+ std::array<BlobReadRequest, num_blobs> requests_buf;
+ autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
+ blob_reqs;
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ requests_buf[i] =
+ BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i],
+ kNoCompression, nullptr, &statuses_buf[i]);
+ blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
+ }
+
+ reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ if (i == num_blobs - 1) {
+ ASSERT_TRUE(statuses_buf[i].IsCorruption());
+ } else {
+ ASSERT_OK(statuses_buf[i]);
+ }
+ }
+ }
+
+ // Incorrect value size
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(read_options, keys[1], blob_offsets[1],
+ blob_sizes[1] + 1, kNoCompression,
+ prefetch_buffer, allocator, &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+
+ // MultiGetBlob
+ autovector<std::reference_wrapper<const Slice>> key_refs;
+ for (const auto& key_ref : keys) {
+ key_refs.emplace_back(std::cref(key_ref));
+ }
+
+ std::array<Status, num_blobs> statuses_buf;
+ std::array<BlobReadRequest, num_blobs> requests_buf;
+
+ requests_buf[0] =
+ BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0],
+ kNoCompression, nullptr, &statuses_buf[0]);
+ requests_buf[1] =
+ BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1,
+ kNoCompression, nullptr, &statuses_buf[1]);
+ requests_buf[2] =
+ BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2],
+ kNoCompression, nullptr, &statuses_buf[2]);
+
+ autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
+ blob_reqs;
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
+ }
+
+ reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
+
+ for (size_t i = 0; i < num_blobs; ++i) {
+ if (i != 1) {
+ ASSERT_OK(statuses_buf[i]);
+ } else {
+ ASSERT_TRUE(statuses_buf[i].IsCorruption());
+ }
+ }
+ }
+}
+
+TEST_F(BlobFileReaderTest, Malformed) {
+ // Write a blob file consisting of nothing but a header, and make sure we
+ // detect the error when we open it for reading
+
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_Malformed"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr uint64_t blob_file_number = 1;
+
+ {
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+
+ const std::string blob_file_path =
+ BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
+
+ std::unique_ptr<FSWritableFile> file;
+ ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
+ FileOptions()));
+
+ std::unique_ptr<WritableFileWriter> file_writer(
+ new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
+ immutable_options.clock));
+
+ constexpr Statistics* statistics = nullptr;
+ constexpr bool use_fsync = false;
+ constexpr bool do_flush = false;
+
+ BlobLogWriter blob_log_writer(std::move(file_writer),
+ immutable_options.clock, statistics,
+ blob_file_number, use_fsync, do_flush);
+
+ BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
+ expiration_range);
+
+ ASSERT_OK(blob_log_writer.WriteHeader(header));
+ }
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
+ column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/,
+ &reader)
+ .IsCorruption());
+}
+
+TEST_F(BlobFileReaderTest, TTL) {
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_TTL"), 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = true;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, key, blob, kNoCompression,
+ &blob_offset, &blob_size);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
+ column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/,
+ &reader)
+ .IsCorruption());
+}
+
+TEST_F(BlobFileReaderTest, ExpirationRangeInHeader) {
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(),
+ "BlobFileReaderTest_ExpirationRangeInHeader"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ const ExpirationRange expiration_range_header(
+ 1, 2); // can be made constexpr when we adopt C++14
+ constexpr ExpirationRange expiration_range_footer;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl,
+ expiration_range_header, expiration_range_footer,
+ blob_file_number, key, blob, kNoCompression, &blob_offset,
+ &blob_size);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
+ column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/,
+ &reader)
+ .IsCorruption());
+}
+
+TEST_F(BlobFileReaderTest, ExpirationRangeInFooter) {
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(),
+ "BlobFileReaderTest_ExpirationRangeInFooter"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range_header;
+ const ExpirationRange expiration_range_footer(
+ 1, 2); // can be made constexpr when we adopt C++14
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl,
+ expiration_range_header, expiration_range_footer,
+ blob_file_number, key, blob, kNoCompression, &blob_offset,
+ &blob_size);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
+ column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/,
+ &reader)
+ .IsCorruption());
+}
+
+TEST_F(BlobFileReaderTest, IncorrectColumnFamily) {
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(),
+ "BlobFileReaderTest_IncorrectColumnFamily"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, key, blob, kNoCompression,
+ &blob_offset, &blob_size);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ constexpr uint32_t incorrect_column_family_id = 2;
+
+ ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
+ incorrect_column_family_id,
+ blob_file_read_hist, blob_file_number,
+ nullptr /*IOTracer*/, &reader)
+ .IsCorruption());
+}
+
+TEST_F(BlobFileReaderTest, BlobCRCError) {
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_BlobCRCError"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, key, blob, kNoCompression,
+ &blob_offset, &blob_size);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_OK(BlobFileReader::Create(
+ immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/, &reader));
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobFileReader::VerifyBlob:CheckBlobCRC", [](void* arg) {
+ BlobLogRecord* const record = static_cast<BlobLogRecord*>(arg);
+ assert(record);
+
+ record->blob_crc = 0xfaceb00c;
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr MemoryAllocator* allocator = nullptr;
+
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
+ kNoCompression, prefetch_buffer, allocator, &value,
+ &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(BlobFileReaderTest, Compression) {
+ if (!Snappy_Supported()) {
+ return;
+ }
+
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_Compression"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, key, blob,
+ kSnappyCompression, &blob_offset, &blob_size);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_OK(BlobFileReader::Create(
+ immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/, &reader));
+
+ // Make sure the blob can be retrieved with and without checksum verification
+ ReadOptions read_options;
+ read_options.verify_checksums = false;
+
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr MemoryAllocator* allocator = nullptr;
+
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
+ kSnappyCompression, prefetch_buffer, allocator,
+ &value, &bytes_read));
+ ASSERT_NE(value, nullptr);
+ ASSERT_EQ(value->data(), blob);
+ ASSERT_EQ(bytes_read, blob_size);
+ }
+
+ read_options.verify_checksums = true;
+
+ {
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
+ kSnappyCompression, prefetch_buffer, allocator,
+ &value, &bytes_read));
+ ASSERT_NE(value, nullptr);
+ ASSERT_EQ(value->data(), blob);
+
+ constexpr uint64_t key_size = sizeof(key) - 1;
+ ASSERT_EQ(bytes_read,
+ BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
+ blob_size);
+ }
+}
+
+TEST_F(BlobFileReaderTest, UncompressionError) {
+ if (!Snappy_Supported()) {
+ return;
+ }
+
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(),
+ "BlobFileReaderTest_UncompressionError"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, key, blob,
+ kSnappyCompression, &blob_offset, &blob_size);
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ ASSERT_OK(BlobFileReader::Create(
+ immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/, &reader));
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", [](void* arg) {
+ CacheAllocationPtr* const output =
+ static_cast<CacheAllocationPtr*>(arg);
+ assert(output);
+
+ output->reset();
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr MemoryAllocator* allocator = nullptr;
+
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
+ kSnappyCompression, prefetch_buffer, allocator,
+ &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+class BlobFileReaderIOErrorTest
+ : public testing::Test,
+ public testing::WithParamInterface<std::string> {
+ protected:
+ BlobFileReaderIOErrorTest() : sync_point_(GetParam()) {
+ mock_env_.reset(MockEnv::Create(Env::Default()));
+ fault_injection_env_.reset(new FaultInjectionTestEnv(mock_env_.get()));
+ }
+
+ std::unique_ptr<Env> mock_env_;
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
+ std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderIOErrorTest,
+ ::testing::ValuesIn(std::vector<std::string>{
+ "BlobFileReader::OpenFile:GetFileSize",
+ "BlobFileReader::OpenFile:NewRandomAccessFile",
+ "BlobFileReader::ReadHeader:ReadFromFile",
+ "BlobFileReader::ReadFooter:ReadFromFile",
+ "BlobFileReader::GetBlob:ReadFromFile"}));
+
+TEST_P(BlobFileReaderIOErrorTest, IOError) {
+ // Simulates an I/O error during the specified step
+
+ Options options;
+ options.env = fault_injection_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(fault_injection_env_.get(),
+ "BlobFileReaderIOErrorTest_IOError"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, key, blob, kNoCompression,
+ &blob_offset, &blob_size);
+
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
+ fault_injection_env_->SetFilesystemActive(false,
+ Status::IOError(sync_point_));
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ const Status s = BlobFileReader::Create(
+ immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/, &reader);
+
+ const bool fail_during_create =
+ (sync_point_ != "BlobFileReader::GetBlob:ReadFromFile");
+
+ if (fail_during_create) {
+ ASSERT_TRUE(s.IsIOError());
+ } else {
+ ASSERT_OK(s);
+
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr MemoryAllocator* allocator = nullptr;
+
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
+ kNoCompression, prefetch_buffer, allocator,
+ &value, &bytes_read)
+ .IsIOError());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+ }
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+class BlobFileReaderDecodingErrorTest
+ : public testing::Test,
+ public testing::WithParamInterface<std::string> {
+ protected:
+ BlobFileReaderDecodingErrorTest() : sync_point_(GetParam()) {
+ mock_env_.reset(MockEnv::Create(Env::Default()));
+ }
+
+ std::unique_ptr<Env> mock_env_;
+ std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderDecodingErrorTest,
+ ::testing::ValuesIn(std::vector<std::string>{
+ "BlobFileReader::ReadHeader:TamperWithResult",
+ "BlobFileReader::ReadFooter:TamperWithResult",
+ "BlobFileReader::GetBlob:TamperWithResult"}));
+
+TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) {
+ Options options;
+ options.env = mock_env_.get();
+ options.cf_paths.emplace_back(
+ test::PerThreadDBPath(mock_env_.get(),
+ "BlobFileReaderDecodingErrorTest_DecodingError"),
+ 0);
+ options.enable_blob_files = true;
+
+ ImmutableOptions immutable_options(options);
+
+ constexpr uint32_t column_family_id = 1;
+ constexpr bool has_ttl = false;
+ constexpr ExpirationRange expiration_range;
+ constexpr uint64_t blob_file_number = 1;
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ uint64_t blob_offset = 0;
+ uint64_t blob_size = 0;
+
+ WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
+ expiration_range, blob_file_number, key, blob, kNoCompression,
+ &blob_offset, &blob_size);
+
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [](void* arg) {
+ Slice* const slice = static_cast<Slice*>(arg);
+ assert(slice);
+ assert(!slice->empty());
+
+ slice->remove_prefix(1);
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ constexpr HistogramImpl* blob_file_read_hist = nullptr;
+
+ std::unique_ptr<BlobFileReader> reader;
+
+ const Status s = BlobFileReader::Create(
+ immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
+ blob_file_number, nullptr /*IOTracer*/, &reader);
+
+ const bool fail_during_create =
+ sync_point_ != "BlobFileReader::GetBlob:TamperWithResult";
+
+ if (fail_during_create) {
+ ASSERT_TRUE(s.IsCorruption());
+ } else {
+ ASSERT_OK(s);
+
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr MemoryAllocator* allocator = nullptr;
+
+ std::unique_ptr<BlobContents> value;
+ uint64_t bytes_read = 0;
+
+ ASSERT_TRUE(reader
+ ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
+ kNoCompression, prefetch_buffer, allocator,
+ &value, &bytes_read)
+ .IsCorruption());
+ ASSERT_EQ(value, nullptr);
+ ASSERT_EQ(bytes_read, 0);
+ }
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}