summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/blob_db/blob_log_reader.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/blob_db/blob_log_reader.cc105
1 files changed, 105 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_log_reader.cc b/src/rocksdb/utilities/blob_db/blob_log_reader.cc
new file mode 100644
index 000000000..1a4b5ac81
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_log_reader.cc
@@ -0,0 +1,105 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_log_reader.h"
+
+#include <algorithm>
+
+#include "file/random_access_file_reader.h"
+#include "monitoring/statistics.h"
+#include "util/stop_watch.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+Reader::Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
+ Statistics* statistics)
+ : file_(std::move(file_reader)),
+ env_(env),
+ statistics_(statistics),
+ buffer_(),
+ next_byte_(0) {}
+
+Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
+ StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
+ Status s = file_->Read(next_byte_, static_cast<size_t>(size), slice, buf);
+ next_byte_ += size;
+ if (!s.ok()) {
+ return s;
+ }
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size());
+ if (slice->size() != size) {
+ return Status::Corruption("EOF reached while reading record");
+ }
+ return s;
+}
+
+Status Reader::ReadHeader(BlobLogHeader* header) {
+ assert(file_.get() != nullptr);
+ assert(next_byte_ == 0);
+ Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (buffer_.size() != BlobLogHeader::kSize) {
+ return Status::Corruption("EOF reached before file header");
+ }
+
+ return header->DecodeFrom(buffer_);
+}
+
+Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
+ uint64_t* blob_offset) {
+ Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_);
+ if (!s.ok()) {
+ return s;
+ }
+ if (buffer_.size() != BlobLogRecord::kHeaderSize) {
+ return Status::Corruption("EOF reached before record header");
+ }
+
+ s = record->DecodeHeaderFrom(buffer_);
+ if (!s.ok()) {
+ return s;
+ }
+
+ uint64_t kb_size = record->key_size + record->value_size;
+ if (blob_offset != nullptr) {
+ *blob_offset = next_byte_ + record->key_size;
+ }
+
+ switch (level) {
+ case kReadHeader:
+ next_byte_ += kb_size;
+ break;
+
+ case kReadHeaderKey:
+ record->key_buf.reset(new char[record->key_size]);
+ s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
+ next_byte_ += record->value_size;
+ break;
+
+ case kReadHeaderKeyBlob:
+ record->key_buf.reset(new char[record->key_size]);
+ s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
+ if (s.ok()) {
+ record->value_buf.reset(new char[record->value_size]);
+ s = ReadSlice(record->value_size, &record->value,
+ record->value_buf.get());
+ }
+ if (s.ok()) {
+ s = record->CheckBlobCRC();
+ }
+ break;
+ }
+ return s;
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE