diff options
Diffstat (limited to 'src/rocksdb/db/log_writer.cc')
-rw-r--r-- | src/rocksdb/db/log_writer.cc | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/src/rocksdb/db/log_writer.cc b/src/rocksdb/db/log_writer.cc new file mode 100644 index 00000000..bc99931b --- /dev/null +++ b/src/rocksdb/db/log_writer.cc @@ -0,0 +1,147 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/log_writer.h" + +#include <stdint.h> +#include "rocksdb/env.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/file_reader_writer.h" + +namespace rocksdb { +namespace log { + +Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number, + bool recycle_log_files, bool manual_flush) + : dest_(std::move(dest)), + block_offset_(0), + log_number_(log_number), + recycle_log_files_(recycle_log_files), + manual_flush_(manual_flush) { + for (int i = 0; i <= kMaxRecordType; i++) { + char t = static_cast<char>(i); + type_crc_[i] = crc32c::Value(&t, 1); + } +} + +Writer::~Writer() { WriteBuffer(); } + +Status Writer::WriteBuffer() { return dest_->Flush(); } + +Status Writer::AddRecord(const Slice& slice) { + const char* ptr = slice.data(); + size_t left = slice.size(); + + // Header size varies depending on whether we are recycling or not. + const int header_size = + recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize; + + // Fragment the record if necessary and emit it. Note that if slice + // is empty, we still want to iterate once to emit a single + // zero-length record + Status s; + bool begin = true; + do { + const int64_t leftover = kBlockSize - block_offset_; + assert(leftover >= 0); + if (leftover < header_size) { + // Switch to a new block + if (leftover > 0) { + // Fill the trailer (literal below relies on kHeaderSize and + // kRecyclableHeaderSize being <= 11) + assert(header_size <= 11); + s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + static_cast<size_t>(leftover))); + if (!s.ok()) { + break; + } + } + block_offset_ = 0; + } + + // Invariant: we never leave < header_size bytes in a block. + assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size); + + const size_t avail = kBlockSize - block_offset_ - header_size; + const size_t fragment_length = (left < avail) ? left : avail; + + RecordType type; + const bool end = (left == fragment_length); + if (begin && end) { + type = recycle_log_files_ ? kRecyclableFullType : kFullType; + } else if (begin) { + type = recycle_log_files_ ? kRecyclableFirstType : kFirstType; + } else if (end) { + type = recycle_log_files_ ? kRecyclableLastType : kLastType; + } else { + type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; + } + + s = EmitPhysicalRecord(type, ptr, fragment_length); + ptr += fragment_length; + left -= fragment_length; + begin = false; + } while (s.ok() && left > 0); + return s; +} + +bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } + +Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { + assert(n <= 0xffff); // Must fit in two bytes + + size_t header_size; + char buf[kRecyclableHeaderSize]; + + // Format the header + buf[4] = static_cast<char>(n & 0xff); + buf[5] = static_cast<char>(n >> 8); + buf[6] = static_cast<char>(t); + + uint32_t crc = type_crc_[t]; + if (t < kRecyclableFullType) { + // Legacy record format + assert(block_offset_ + kHeaderSize + n <= kBlockSize); + header_size = kHeaderSize; + } else { + // Recyclable record format + assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize); + header_size = kRecyclableHeaderSize; + + // Only encode low 32-bits of the 64-bit log number. This means + // we will fail to detect an old record if we recycled a log from + // ~4 billion logs ago, but that is effectively impossible, and + // even if it were we'dbe far more likely to see a false positive + // on the 32-bit CRC. + EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_)); + crc = crc32c::Extend(crc, buf + 7, 4); + } + + // Compute the crc of the record type and the payload. + crc = crc32c::Extend(crc, ptr, n); + crc = crc32c::Mask(crc); // Adjust for storage + EncodeFixed32(buf, crc); + + // Write the header and the payload + Status s = dest_->Append(Slice(buf, header_size)); + if (s.ok()) { + s = dest_->Append(Slice(ptr, n)); + if (s.ok()) { + if (!manual_flush_) { + s = dest_->Flush(); + } + } + } + block_offset_ += header_size + n; + return s; +} + +} // namespace log +} // namespace rocksdb |