summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/log_reader.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/log_reader.h')
-rw-r--r--src/rocksdb/db/log_reader.h225
1 files changed, 225 insertions, 0 deletions
diff --git a/src/rocksdb/db/log_reader.h b/src/rocksdb/db/log_reader.h
new file mode 100644
index 000000000..e3be1570e
--- /dev/null
+++ b/src/rocksdb/db/log_reader.h
@@ -0,0 +1,225 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <stdint.h>
+
+#include <memory>
+
+#include "db/log_format.h"
+#include "file/sequence_file_reader.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "util/compression.h"
+#include "util/xxhash.h"
+
+namespace ROCKSDB_NAMESPACE {
+class Logger;
+
+namespace log {
+
+/**
+ * Reader is a general purpose log stream reader implementation. The actual job
+ * of reading from the device is implemented by the SequentialFile interface.
+ *
+ * Please see Writer for details on the file and record layout.
+ */
+class Reader {
+ public:
+ // Interface for reporting errors.
+ class Reporter {
+ public:
+ virtual ~Reporter();
+
+ // Some corruption was detected. "size" is the approximate number
+ // of bytes dropped due to the corruption.
+ virtual void Corruption(size_t bytes, const Status& status) = 0;
+ };
+
+ // Create a reader that will return log records from "*file".
+ // "*file" must remain live while this Reader is in use.
+ //
+ // If "reporter" is non-nullptr, it is notified whenever some data is
+ // dropped due to a detected corruption. "*reporter" must remain
+ // live while this Reader is in use.
+ //
+ // If "checksum" is true, verify checksums if available.
+ Reader(std::shared_ptr<Logger> info_log,
+ std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
+ bool checksum, uint64_t log_num);
+ // No copying allowed
+ Reader(const Reader&) = delete;
+ void operator=(const Reader&) = delete;
+
+ virtual ~Reader();
+
+ // Read the next record into *record. Returns true if read
+ // successfully, false if we hit end of the input. May use
+ // "*scratch" as temporary storage. The contents filled in *record
+ // will only be valid until the next mutating operation on this
+ // reader or the next mutation to *scratch.
+ // If record_checksum is not nullptr, then this function will calculate the
+ // checksum of the record read and set record_checksum to it. The checksum is
+ // calculated from the original buffers that contain the contents of the
+ // record.
+ virtual bool ReadRecord(Slice* record, std::string* scratch,
+ WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords,
+ uint64_t* record_checksum = nullptr);
+
+ // Returns the physical offset of the last record returned by ReadRecord.
+ //
+ // Undefined before the first call to ReadRecord.
+ uint64_t LastRecordOffset();
+
+ // Returns the first physical offset after the last record returned by
+ // ReadRecord, or zero before first call to ReadRecord. This can also be
+ // thought of as the "current" position in processing the file bytes.
+ uint64_t LastRecordEnd();
+
+ // returns true if the reader has encountered an eof condition.
+ bool IsEOF() { return eof_; }
+
+ // returns true if the reader has encountered read error.
+ bool hasReadError() const { return read_error_; }
+
+ // when we know more data has been written to the file. we can use this
+ // function to force the reader to look again in the file.
+ // Also aligns the file position indicator to the start of the next block
+ // by reading the rest of the data from the EOF position to the end of the
+ // block that was partially read.
+ virtual void UnmarkEOF();
+
+ SequentialFileReader* file() { return file_.get(); }
+
+ Reporter* GetReporter() const { return reporter_; }
+
+ uint64_t GetLogNumber() const { return log_number_; }
+
+ size_t GetReadOffset() const {
+ return static_cast<size_t>(end_of_buffer_offset_);
+ }
+
+ bool IsCompressedAndEmptyFile() {
+ return !first_record_read_ && compression_type_record_read_;
+ }
+
+ protected:
+ std::shared_ptr<Logger> info_log_;
+ const std::unique_ptr<SequentialFileReader> file_;
+ Reporter* const reporter_;
+ bool const checksum_;
+ char* const backing_store_;
+
+ // Internal state variables used for reading records
+ Slice buffer_;
+ bool eof_; // Last Read() indicated EOF by returning < kBlockSize
+ bool read_error_; // Error occurred while reading from file
+
+ // Offset of the file position indicator within the last block when an
+ // EOF was detected.
+ size_t eof_offset_;
+
+ // Offset of the last record returned by ReadRecord.
+ uint64_t last_record_offset_;
+ // Offset of the first location past the end of buffer_.
+ uint64_t end_of_buffer_offset_;
+
+ // which log number this is
+ uint64_t const log_number_;
+
+ // Whether this is a recycled log file
+ bool recycled_;
+
+ // Whether the first record has been read or not.
+ bool first_record_read_;
+ // Type of compression used
+ CompressionType compression_type_;
+ // Track whether the compression type record has been read or not.
+ bool compression_type_record_read_;
+ StreamingUncompress* uncompress_;
+ // Reusable uncompressed output buffer
+ std::unique_ptr<char[]> uncompressed_buffer_;
+ // Reusable uncompressed record
+ std::string uncompressed_record_;
+ // Used for stream hashing fragment content in ReadRecord()
+ XXH3_state_t* hash_state_;
+ // Used for stream hashing uncompressed buffer in ReadPhysicalRecord()
+ XXH3_state_t* uncompress_hash_state_;
+
+ // Extend record types with the following special values
+ enum {
+ kEof = kMaxRecordType + 1,
+ // Returned whenever we find an invalid physical record.
+ // Currently there are three situations in which this happens:
+ // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
+ // * The record is a 0-length record (No drop is reported)
+ kBadRecord = kMaxRecordType + 2,
+ // Returned when we fail to read a valid header.
+ kBadHeader = kMaxRecordType + 3,
+ // Returned when we read an old record from a previous user of the log.
+ kOldRecord = kMaxRecordType + 4,
+ // Returned when we get a bad record length
+ kBadRecordLen = kMaxRecordType + 5,
+ // Returned when we get a bad record checksum
+ kBadRecordChecksum = kMaxRecordType + 6,
+ };
+
+ // Return type, or one of the preceding special values
+ // If WAL compressioned is enabled, fragment_checksum is the checksum of the
+ // fragment computed from the orginal buffer containinng uncompressed
+ // fragment.
+ unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size,
+ uint64_t* fragment_checksum = nullptr);
+
+ // Read some more
+ bool ReadMore(size_t* drop_size, int* error);
+
+ void UnmarkEOFInternal();
+
+ // Reports dropped bytes to the reporter.
+ // buffer_ must be updated to remove the dropped bytes prior to invocation.
+ void ReportCorruption(size_t bytes, const char* reason);
+ void ReportDrop(size_t bytes, const Status& reason);
+
+ void InitCompression(const CompressionTypeRecord& compression_record);
+};
+
+class FragmentBufferedReader : public Reader {
+ public:
+ FragmentBufferedReader(std::shared_ptr<Logger> info_log,
+ std::unique_ptr<SequentialFileReader>&& _file,
+ Reporter* reporter, bool checksum, uint64_t log_num)
+ : Reader(info_log, std::move(_file), reporter, checksum, log_num),
+ fragments_(),
+ in_fragmented_record_(false) {}
+ ~FragmentBufferedReader() override {}
+ bool ReadRecord(Slice* record, std::string* scratch,
+ WALRecoveryMode wal_recovery_mode =
+ WALRecoveryMode::kTolerateCorruptedTailRecords,
+ uint64_t* record_checksum = nullptr) override;
+ void UnmarkEOF() override;
+
+ private:
+ std::string fragments_;
+ bool in_fragmented_record_;
+
+ bool TryReadFragment(Slice* result, size_t* drop_size,
+ unsigned int* fragment_type_or_err);
+
+ bool TryReadMore(size_t* drop_size, int* error);
+
+ // No copy allowed
+ FragmentBufferedReader(const FragmentBufferedReader&);
+ void operator=(const FragmentBufferedReader&);
+};
+
+} // namespace log
+} // namespace ROCKSDB_NAMESPACE