diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/blob/blob_log_sequential_reader.cc | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/src/rocksdb/db/blob/blob_log_sequential_reader.cc b/src/rocksdb/db/blob/blob_log_sequential_reader.cc new file mode 100644 index 000000000..778725189 --- /dev/null +++ b/src/rocksdb/db/blob/blob_log_sequential_reader.cc @@ -0,0 +1,134 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include "db/blob/blob_log_sequential_reader.h" + +#include "file/random_access_file_reader.h" +#include "monitoring/statistics.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { + +BlobLogSequentialReader::BlobLogSequentialReader( + std::unique_ptr<RandomAccessFileReader>&& file_reader, SystemClock* clock, + Statistics* statistics) + : file_(std::move(file_reader)), + clock_(clock), + statistics_(statistics), + next_byte_(0) {} + +BlobLogSequentialReader::~BlobLogSequentialReader() = default; + +Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice, + char* buf) { + assert(slice); + assert(file_); + + StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); + // TODO: rate limit `BlobLogSequentialReader` reads (it appears unused?) + Status s = + file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size), slice, + buf, nullptr, Env::IO_TOTAL /* rate_limiter_priority */); + next_byte_ += size; + if (!s.ok()) { + return s; + } + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size()); + if (slice->size() != size) { + return Status::Corruption("EOF reached while reading record"); + } + return s; +} + +Status BlobLogSequentialReader::ReadHeader(BlobLogHeader* header) { + assert(header); + assert(next_byte_ == 0); + + static_assert(BlobLogHeader::kSize <= sizeof(header_buf_), + "Buffer is smaller than BlobLogHeader::kSize"); + + Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_); + if (!s.ok()) { + return s; + } + + if (buffer_.size() != BlobLogHeader::kSize) { + return Status::Corruption("EOF reached before file header"); + } + + return header->DecodeFrom(buffer_); +} + +Status BlobLogSequentialReader::ReadRecord(BlobLogRecord* record, + ReadLevel level, + uint64_t* blob_offset) { + assert(record); + static_assert(BlobLogRecord::kHeaderSize <= sizeof(header_buf_), + "Buffer is smaller than BlobLogRecord::kHeaderSize"); + + Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_); + if (!s.ok()) { + return s; + } + if (buffer_.size() != BlobLogRecord::kHeaderSize) { + return Status::Corruption("EOF reached before record header"); + } + + s = record->DecodeHeaderFrom(buffer_); + if (!s.ok()) { + return s; + } + + uint64_t kb_size = record->key_size + record->value_size; + if (blob_offset != nullptr) { + *blob_offset = next_byte_ + record->key_size; + } + + switch (level) { + case kReadHeader: + next_byte_ += kb_size; + break; + + case kReadHeaderKey: + record->key_buf.reset(new char[record->key_size]); + s = ReadSlice(record->key_size, &record->key, record->key_buf.get()); + next_byte_ += record->value_size; + break; + + case kReadHeaderKeyBlob: + record->key_buf.reset(new char[record->key_size]); + s = ReadSlice(record->key_size, &record->key, record->key_buf.get()); + if (s.ok()) { + record->value_buf.reset(new char[record->value_size]); + s = ReadSlice(record->value_size, &record->value, + record->value_buf.get()); + } + if (s.ok()) { + s = record->CheckBlobCRC(); + } + break; + } + return s; +} + +Status BlobLogSequentialReader::ReadFooter(BlobLogFooter* footer) { + assert(footer); + static_assert(BlobLogFooter::kSize <= sizeof(header_buf_), + "Buffer is smaller than BlobLogFooter::kSize"); + + Status s = ReadSlice(BlobLogFooter::kSize, &buffer_, header_buf_); + if (!s.ok()) { + return s; + } + + if (buffer_.size() != BlobLogFooter::kSize) { + return Status::Corruption("EOF reached before file footer"); + } + + return footer->DecodeFrom(buffer_); +} + +} // namespace ROCKSDB_NAMESPACE |