diff options
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/utilities/trace/file_trace_reader_writer.cc | 123 | ||||
-rw-r--r-- | src/rocksdb/utilities/trace/file_trace_reader_writer.h | 48 |
2 files changed, 171 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/trace/file_trace_reader_writer.cc b/src/rocksdb/utilities/trace/file_trace_reader_writer.cc new file mode 100644 index 000000000..7160f7a4c --- /dev/null +++ b/src/rocksdb/utilities/trace/file_trace_reader_writer.cc @@ -0,0 +1,123 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/trace/file_trace_reader_writer.h" + +#include "env/composite_env_wrapper.h" +#include "file/random_access_file_reader.h" +#include "file/writable_file_writer.h" +#include "trace_replay/trace_replay.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +const unsigned int FileTraceReader::kBufferSize = 1024; // 1KB + +FileTraceReader::FileTraceReader( + std::unique_ptr<RandomAccessFileReader>&& reader) + : file_reader_(std::move(reader)), + offset_(0), + buffer_(new char[kBufferSize]) {} + +FileTraceReader::~FileTraceReader() { + Close(); + delete[] buffer_; +} + +Status FileTraceReader::Close() { + file_reader_.reset(); + return Status::OK(); +} + +Status FileTraceReader::Read(std::string* data) { + assert(file_reader_ != nullptr); + Status s = file_reader_->Read(offset_, kTraceMetadataSize, &result_, buffer_); + if (!s.ok()) { + return s; + } + if (result_.size() == 0) { + // No more data to read + // Todo: Come up with a better way to indicate end of data. May be this + // could be avoided once footer is introduced. + return Status::Incomplete(); + } + if (result_.size() < kTraceMetadataSize) { + return Status::Corruption("Corrupted trace file."); + } + *data = result_.ToString(); + offset_ += kTraceMetadataSize; + + uint32_t payload_len = + DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]); + + // Read Payload + unsigned int bytes_to_read = payload_len; + unsigned int to_read = + bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; + while (to_read > 0) { + s = file_reader_->Read(offset_, to_read, &result_, buffer_); + if (!s.ok()) { + return s; + } + if (result_.size() < to_read) { + return Status::Corruption("Corrupted trace file."); + } + data->append(result_.data(), result_.size()); + + offset_ += to_read; + bytes_to_read -= to_read; + to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; + } + + return s; +} + +FileTraceWriter::~FileTraceWriter() { Close(); } + +Status FileTraceWriter::Close() { + file_writer_.reset(); + return Status::OK(); +} + +Status FileTraceWriter::Write(const Slice& data) { + return file_writer_->Append(data); +} + +uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); } + +Status NewFileTraceReader(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr<TraceReader>* trace_reader) { + std::unique_ptr<RandomAccessFile> trace_file; + Status s = env->NewRandomAccessFile(trace_filename, &trace_file, env_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr<RandomAccessFileReader> file_reader; + file_reader.reset(new RandomAccessFileReader( + NewLegacyRandomAccessFileWrapper(trace_file), trace_filename)); + trace_reader->reset(new FileTraceReader(std::move(file_reader))); + return s; +} + +Status NewFileTraceWriter(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr<TraceWriter>* trace_writer) { + std::unique_ptr<WritableFile> trace_file; + Status s = env->NewWritableFile(trace_filename, &trace_file, env_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr<WritableFileWriter> file_writer; + file_writer.reset(new WritableFileWriter( + NewLegacyWritableFileWrapper(std::move(trace_file)), trace_filename, + env_options)); + trace_writer->reset(new FileTraceWriter(std::move(file_writer))); + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/trace/file_trace_reader_writer.h b/src/rocksdb/utilities/trace/file_trace_reader_writer.h new file mode 100644 index 000000000..a9eafa5af --- /dev/null +++ b/src/rocksdb/utilities/trace/file_trace_reader_writer.h @@ -0,0 +1,48 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/trace_reader_writer.h" + +namespace ROCKSDB_NAMESPACE { + +class RandomAccessFileReader; +class WritableFileWriter; + +// FileTraceReader allows reading RocksDB traces from a file. +class FileTraceReader : public TraceReader { + public: + explicit FileTraceReader(std::unique_ptr<RandomAccessFileReader>&& reader); + ~FileTraceReader(); + + virtual Status Read(std::string* data) override; + virtual Status Close() override; + + private: + std::unique_ptr<RandomAccessFileReader> file_reader_; + Slice result_; + size_t offset_; + char* const buffer_; + + static const unsigned int kBufferSize; +}; + +// FileTraceWriter allows writing RocksDB traces to a file. +class FileTraceWriter : public TraceWriter { + public: + explicit FileTraceWriter(std::unique_ptr<WritableFileWriter>&& file_writer) + : file_writer_(std::move(file_writer)) {} + ~FileTraceWriter(); + + virtual Status Write(const Slice& data) override; + virtual Status Close() override; + virtual uint64_t GetFileSize() override; + + private: + std::unique_ptr<WritableFileWriter> file_writer_; +}; + +} // namespace ROCKSDB_NAMESPACE |