summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/blob/blob_file_reader.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/blob/blob_file_reader.cc610
1 files changed, 610 insertions, 0 deletions
diff --git a/src/rocksdb/db/blob/blob_file_reader.cc b/src/rocksdb/db/blob/blob_file_reader.cc
new file mode 100644
index 000000000..a4eabb605
--- /dev/null
+++ b/src/rocksdb/db/blob/blob_file_reader.cc
@@ -0,0 +1,610 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/blob/blob_file_reader.h"
+
+#include <cassert>
+#include <string>
+
+#include "db/blob/blob_contents.h"
+#include "db/blob/blob_log_format.h"
+#include "file/file_prefetch_buffer.h"
+#include "file/filename.h"
+#include "monitoring/statistics.h"
+#include "options/cf_options.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "table/multiget_context.h"
+#include "test_util/sync_point.h"
+#include "util/compression.h"
+#include "util/crc32c.h"
+#include "util/stop_watch.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+Status BlobFileReader::Create(
+ const ImmutableOptions& immutable_options, const FileOptions& file_options,
+ uint32_t column_family_id, HistogramImpl* blob_file_read_hist,
+ uint64_t blob_file_number, const std::shared_ptr<IOTracer>& io_tracer,
+ std::unique_ptr<BlobFileReader>* blob_file_reader) {
+ assert(blob_file_reader);
+ assert(!*blob_file_reader);
+
+ uint64_t file_size = 0;
+ std::unique_ptr<RandomAccessFileReader> file_reader;
+
+ {
+ const Status s =
+ OpenFile(immutable_options, file_options, blob_file_read_hist,
+ blob_file_number, io_tracer, &file_size, &file_reader);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ assert(file_reader);
+
+ Statistics* const statistics = immutable_options.stats;
+
+ CompressionType compression_type = kNoCompression;
+
+ {
+ const Status s = ReadHeader(file_reader.get(), column_family_id, statistics,
+ &compression_type);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ {
+ const Status s = ReadFooter(file_reader.get(), file_size, statistics);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ blob_file_reader->reset(
+ new BlobFileReader(std::move(file_reader), file_size, compression_type,
+ immutable_options.clock, statistics));
+
+ return Status::OK();
+}
+
+Status BlobFileReader::OpenFile(
+ const ImmutableOptions& immutable_options, const FileOptions& file_opts,
+ HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
+ const std::shared_ptr<IOTracer>& io_tracer, uint64_t* file_size,
+ std::unique_ptr<RandomAccessFileReader>* file_reader) {
+ assert(file_size);
+ assert(file_reader);
+
+ const auto& cf_paths = immutable_options.cf_paths;
+ assert(!cf_paths.empty());
+
+ const std::string blob_file_path =
+ BlobFileName(cf_paths.front().path, blob_file_number);
+
+ FileSystem* const fs = immutable_options.fs.get();
+ assert(fs);
+
+ constexpr IODebugContext* dbg = nullptr;
+
+ {
+ TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize");
+
+ const Status s =
+ fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
+ return Status::Corruption("Malformed blob file");
+ }
+
+ std::unique_ptr<FSRandomAccessFile> file;
+
+ {
+ TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile");
+
+ const Status s =
+ fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ assert(file);
+
+ if (immutable_options.advise_random_on_open) {
+ file->Hint(FSRandomAccessFile::kRandom);
+ }
+
+ file_reader->reset(new RandomAccessFileReader(
+ std::move(file), blob_file_path, immutable_options.clock, io_tracer,
+ immutable_options.stats, BLOB_DB_BLOB_FILE_READ_MICROS,
+ blob_file_read_hist, immutable_options.rate_limiter.get(),
+ immutable_options.listeners));
+
+ return Status::OK();
+}
+
+Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
+ uint32_t column_family_id,
+ Statistics* statistics,
+ CompressionType* compression_type) {
+ assert(file_reader);
+ assert(compression_type);
+
+ Slice header_slice;
+ Buffer buf;
+ AlignedBuf aligned_buf;
+
+ {
+ TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile");
+
+ constexpr uint64_t read_offset = 0;
+ constexpr size_t read_size = BlobLogHeader::kSize;
+
+ // TODO: rate limit reading headers from blob files.
+ const Status s = ReadFromFile(file_reader, read_offset, read_size,
+ statistics, &header_slice, &buf, &aligned_buf,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ if (!s.ok()) {
+ return s;
+ }
+
+ TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult",
+ &header_slice);
+ }
+
+ BlobLogHeader header;
+
+ {
+ const Status s = header.DecodeFrom(header_slice);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ constexpr ExpirationRange no_expiration_range;
+
+ if (header.has_ttl || header.expiration_range != no_expiration_range) {
+ return Status::Corruption("Unexpected TTL blob file");
+ }
+
+ if (header.column_family_id != column_family_id) {
+ return Status::Corruption("Column family ID mismatch");
+ }
+
+ *compression_type = header.compression;
+
+ return Status::OK();
+}
+
+Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
+ uint64_t file_size, Statistics* statistics) {
+ assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
+ assert(file_reader);
+
+ Slice footer_slice;
+ Buffer buf;
+ AlignedBuf aligned_buf;
+
+ {
+ TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile");
+
+ const uint64_t read_offset = file_size - BlobLogFooter::kSize;
+ constexpr size_t read_size = BlobLogFooter::kSize;
+
+ // TODO: rate limit reading footers from blob files.
+ const Status s = ReadFromFile(file_reader, read_offset, read_size,
+ statistics, &footer_slice, &buf, &aligned_buf,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ if (!s.ok()) {
+ return s;
+ }
+
+ TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult",
+ &footer_slice);
+ }
+
+ BlobLogFooter footer;
+
+ {
+ const Status s = footer.DecodeFrom(footer_slice);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ constexpr ExpirationRange no_expiration_range;
+
+ if (footer.expiration_range != no_expiration_range) {
+ return Status::Corruption("Unexpected TTL blob file");
+ }
+
+ return Status::OK();
+}
+
+Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
+ uint64_t read_offset, size_t read_size,
+ Statistics* statistics, Slice* slice,
+ Buffer* buf, AlignedBuf* aligned_buf,
+ Env::IOPriority rate_limiter_priority) {
+ assert(slice);
+ assert(buf);
+ assert(aligned_buf);
+
+ assert(file_reader);
+
+ RecordTick(statistics, BLOB_DB_BLOB_FILE_BYTES_READ, read_size);
+
+ Status s;
+
+ if (file_reader->use_direct_io()) {
+ constexpr char* scratch = nullptr;
+
+ s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
+ aligned_buf, rate_limiter_priority);
+ } else {
+ buf->reset(new char[read_size]);
+ constexpr AlignedBuf* aligned_scratch = nullptr;
+
+ s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
+ buf->get(), aligned_scratch, rate_limiter_priority);
+ }
+
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (slice->size() != read_size) {
+ return Status::Corruption("Failed to read data from blob file");
+ }
+
+ return Status::OK();
+}
+
+BlobFileReader::BlobFileReader(
+ std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
+ CompressionType compression_type, SystemClock* clock,
+ Statistics* statistics)
+ : file_reader_(std::move(file_reader)),
+ file_size_(file_size),
+ compression_type_(compression_type),
+ clock_(clock),
+ statistics_(statistics) {
+ assert(file_reader_);
+}
+
+BlobFileReader::~BlobFileReader() = default;
+
+Status BlobFileReader::GetBlob(
+ const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
+ uint64_t value_size, CompressionType compression_type,
+ FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
+ std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
+ assert(result);
+
+ const uint64_t key_size = user_key.size();
+
+ if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
+ return Status::Corruption("Invalid blob offset");
+ }
+
+ if (compression_type != compression_type_) {
+ return Status::Corruption("Compression type mismatch when reading blob");
+ }
+
+ // Note: if verify_checksum is set, we read the entire blob record to be able
+ // to perform the verification; otherwise, we just read the blob itself. Since
+ // the offset in BlobIndex actually points to the blob value, we need to make
+ // an adjustment in the former case.
+ const uint64_t adjustment =
+ read_options.verify_checksums
+ ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
+ : 0;
+ assert(offset >= adjustment);
+
+ const uint64_t record_offset = offset - adjustment;
+ const uint64_t record_size = value_size + adjustment;
+
+ Slice record_slice;
+ Buffer buf;
+ AlignedBuf aligned_buf;
+
+ bool prefetched = false;
+
+ if (prefetch_buffer) {
+ Status s;
+ constexpr bool for_compaction = true;
+
+ prefetched = prefetch_buffer->TryReadFromCache(
+ IOOptions(), file_reader_.get(), record_offset,
+ static_cast<size_t>(record_size), &record_slice, &s,
+ read_options.rate_limiter_priority, for_compaction);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (!prefetched) {
+ TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
+ PERF_COUNTER_ADD(blob_read_count, 1);
+ PERF_COUNTER_ADD(blob_read_byte, record_size);
+ PERF_TIMER_GUARD(blob_read_time);
+ const Status s = ReadFromFile(file_reader_.get(), record_offset,
+ static_cast<size_t>(record_size), statistics_,
+ &record_slice, &buf, &aligned_buf,
+ read_options.rate_limiter_priority);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
+ &record_slice);
+
+ if (read_options.verify_checksums) {
+ const Status s = VerifyBlob(record_slice, user_key, value_size);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ const Slice value_slice(record_slice.data() + adjustment, value_size);
+
+ {
+ const Status s = UncompressBlobIfNeeded(
+ value_slice, compression_type, allocator, clock_, statistics_, result);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (bytes_read) {
+ *bytes_read = record_size;
+ }
+
+ return Status::OK();
+}
+
+void BlobFileReader::MultiGetBlob(
+ const ReadOptions& read_options, MemoryAllocator* allocator,
+ autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>&
+ blob_reqs,
+ uint64_t* bytes_read) const {
+ const size_t num_blobs = blob_reqs.size();
+ assert(num_blobs > 0);
+ assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
+
+#ifndef NDEBUG
+ for (size_t i = 0; i < num_blobs - 1; ++i) {
+ assert(blob_reqs[i].first->offset <= blob_reqs[i + 1].first->offset);
+ }
+#endif // !NDEBUG
+
+ std::vector<FSReadRequest> read_reqs;
+ autovector<uint64_t> adjustments;
+ uint64_t total_len = 0;
+ read_reqs.reserve(num_blobs);
+ for (size_t i = 0; i < num_blobs; ++i) {
+ BlobReadRequest* const req = blob_reqs[i].first;
+ assert(req);
+ assert(req->user_key);
+ assert(req->status);
+
+ const size_t key_size = req->user_key->size();
+ const uint64_t offset = req->offset;
+ const uint64_t value_size = req->len;
+
+ if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
+ *req->status = Status::Corruption("Invalid blob offset");
+ continue;
+ }
+ if (req->compression != compression_type_) {
+ *req->status =
+ Status::Corruption("Compression type mismatch when reading a blob");
+ continue;
+ }
+
+ const uint64_t adjustment =
+ read_options.verify_checksums
+ ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
+ : 0;
+ assert(req->offset >= adjustment);
+ adjustments.push_back(adjustment);
+
+ FSReadRequest read_req = {};
+ read_req.offset = req->offset - adjustment;
+ read_req.len = req->len + adjustment;
+ read_reqs.emplace_back(read_req);
+ total_len += read_req.len;
+ }
+
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);
+
+ Buffer buf;
+ AlignedBuf aligned_buf;
+
+ Status s;
+ bool direct_io = file_reader_->use_direct_io();
+ if (direct_io) {
+ for (size_t i = 0; i < read_reqs.size(); ++i) {
+ read_reqs[i].scratch = nullptr;
+ }
+ } else {
+ buf.reset(new char[total_len]);
+ std::ptrdiff_t pos = 0;
+ for (size_t i = 0; i < read_reqs.size(); ++i) {
+ read_reqs[i].scratch = buf.get() + pos;
+ pos += read_reqs[i].len;
+ }
+ }
+ TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
+ PERF_COUNTER_ADD(blob_read_count, num_blobs);
+ PERF_COUNTER_ADD(blob_read_byte, total_len);
+ s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(),
+ direct_io ? &aligned_buf : nullptr,
+ read_options.rate_limiter_priority);
+ if (!s.ok()) {
+ for (auto& req : read_reqs) {
+ req.status.PermitUncheckedError();
+ }
+ for (auto& blob_req : blob_reqs) {
+ BlobReadRequest* const req = blob_req.first;
+ assert(req);
+ assert(req->status);
+
+ if (!req->status->IsCorruption()) {
+ // Avoid overwriting corruption status.
+ *req->status = s;
+ }
+ }
+ return;
+ }
+
+ assert(s.ok());
+
+ uint64_t total_bytes = 0;
+ for (size_t i = 0, j = 0; i < num_blobs; ++i) {
+ BlobReadRequest* const req = blob_reqs[i].first;
+ assert(req);
+ assert(req->user_key);
+ assert(req->status);
+
+ if (!req->status->ok()) {
+ continue;
+ }
+
+ assert(j < read_reqs.size());
+ auto& read_req = read_reqs[j++];
+ const auto& record_slice = read_req.result;
+ if (read_req.status.ok() && record_slice.size() != read_req.len) {
+ read_req.status =
+ IOStatus::Corruption("Failed to read data from blob file");
+ }
+
+ *req->status = read_req.status;
+ if (!req->status->ok()) {
+ continue;
+ }
+
+ // Verify checksums if enabled
+ if (read_options.verify_checksums) {
+ *req->status = VerifyBlob(record_slice, *req->user_key, req->len);
+ if (!req->status->ok()) {
+ continue;
+ }
+ }
+
+ // Uncompress blob if needed
+ Slice value_slice(record_slice.data() + adjustments[i], req->len);
+ *req->status =
+ UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
+ clock_, statistics_, &blob_reqs[i].second);
+ if (req->status->ok()) {
+ total_bytes += record_slice.size();
+ }
+ }
+
+ if (bytes_read) {
+ *bytes_read = total_bytes;
+ }
+}
+
+Status BlobFileReader::VerifyBlob(const Slice& record_slice,
+ const Slice& user_key, uint64_t value_size) {
+ PERF_TIMER_GUARD(blob_checksum_time);
+
+ BlobLogRecord record;
+
+ const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
+
+ {
+ const Status s = record.DecodeHeaderFrom(header_slice);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (record.key_size != user_key.size()) {
+ return Status::Corruption("Key size mismatch when reading blob");
+ }
+
+ if (record.value_size != value_size) {
+ return Status::Corruption("Value size mismatch when reading blob");
+ }
+
+ record.key =
+ Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size);
+ if (record.key != user_key) {
+ return Status::Corruption("Key mismatch when reading blob");
+ }
+
+ record.value = Slice(record.key.data() + record.key_size, value_size);
+
+ {
+ TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC",
+ &record);
+
+ const Status s = record.CheckBlobCRC();
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status BlobFileReader::UncompressBlobIfNeeded(
+ const Slice& value_slice, CompressionType compression_type,
+ MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
+ std::unique_ptr<BlobContents>* result) {
+ assert(result);
+
+ if (compression_type == kNoCompression) {
+ CacheAllocationPtr allocation =
+ AllocateBlock(value_slice.size(), allocator);
+ memcpy(allocation.get(), value_slice.data(), value_slice.size());
+
+ *result = BlobContents::Create(std::move(allocation), value_slice.size());
+
+ return Status::OK();
+ }
+
+ UncompressionContext context(compression_type);
+ UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
+ compression_type);
+
+ size_t uncompressed_size = 0;
+ constexpr uint32_t compression_format_version = 2;
+
+ CacheAllocationPtr output;
+
+ {
+ PERF_TIMER_GUARD(blob_decompress_time);
+ StopWatch stop_watch(clock, statistics, BLOB_DB_DECOMPRESSION_MICROS);
+ output = UncompressData(info, value_slice.data(), value_slice.size(),
+ &uncompressed_size, compression_format_version,
+ allocator);
+ }
+
+ TEST_SYNC_POINT_CALLBACK(
+ "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output);
+
+ if (!output) {
+ return Status::Corruption("Unable to uncompress blob");
+ }
+
+ *result = BlobContents::Create(std::move(output), uncompressed_size);
+
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE