diff options
Diffstat (limited to 'src/rocksdb/utilities/trace')
-rw-r--r-- | src/rocksdb/utilities/trace/file_trace_reader_writer.cc | 133 | ||||
-rw-r--r-- | src/rocksdb/utilities/trace/file_trace_reader_writer.h | 48 | ||||
-rw-r--r-- | src/rocksdb/utilities/trace/replayer_impl.cc | 316 | ||||
-rw-r--r-- | src/rocksdb/utilities/trace/replayer_impl.h | 86 |
4 files changed, 583 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/trace/file_trace_reader_writer.cc b/src/rocksdb/utilities/trace/file_trace_reader_writer.cc new file mode 100644 index 000000000..5886d3539 --- /dev/null +++ b/src/rocksdb/utilities/trace/file_trace_reader_writer.cc @@ -0,0 +1,133 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/trace/file_trace_reader_writer.h" + +#include "env/composite_env_wrapper.h" +#include "file/random_access_file_reader.h" +#include "file/writable_file_writer.h" +#include "trace_replay/trace_replay.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +const unsigned int FileTraceReader::kBufferSize = 1024; // 1KB + +FileTraceReader::FileTraceReader( + std::unique_ptr<RandomAccessFileReader>&& reader) + : file_reader_(std::move(reader)), + offset_(0), + buffer_(new char[kBufferSize]) {} + +FileTraceReader::~FileTraceReader() { + Close().PermitUncheckedError(); + delete[] buffer_; +} + +Status FileTraceReader::Close() { + file_reader_.reset(); + return Status::OK(); +} + +Status FileTraceReader::Reset() { + if (file_reader_ == nullptr) { + return Status::IOError("TraceReader is closed."); + } + offset_ = 0; + return Status::OK(); +} + +Status FileTraceReader::Read(std::string* data) { + assert(file_reader_ != nullptr); + Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize, + &result_, buffer_, nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); + if (!s.ok()) { + return s; + } + if (result_.size() == 0) { + // No more data to read + // Todo: Come up with a better way to indicate end of data. May be this + // could be avoided once footer is introduced. + return Status::Incomplete(); + } + if (result_.size() < kTraceMetadataSize) { + return Status::Corruption("Corrupted trace file."); + } + *data = result_.ToString(); + offset_ += kTraceMetadataSize; + + uint32_t payload_len = + DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]); + + // Read Payload + unsigned int bytes_to_read = payload_len; + unsigned int to_read = + bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; + while (to_read > 0) { + s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_, + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); + if (!s.ok()) { + return s; + } + if (result_.size() < to_read) { + return Status::Corruption("Corrupted trace file."); + } + data->append(result_.data(), result_.size()); + + offset_ += to_read; + bytes_to_read -= to_read; + to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; + } + + return s; +} + +FileTraceWriter::FileTraceWriter( + std::unique_ptr<WritableFileWriter>&& file_writer) + : file_writer_(std::move(file_writer)) {} + +FileTraceWriter::~FileTraceWriter() { Close().PermitUncheckedError(); } + +Status FileTraceWriter::Close() { + file_writer_.reset(); + return Status::OK(); +} + +Status FileTraceWriter::Write(const Slice& data) { + return file_writer_->Append(data); +} + +uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); } + +Status NewFileTraceReader(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr<TraceReader>* trace_reader) { + std::unique_ptr<RandomAccessFileReader> file_reader; + Status s = RandomAccessFileReader::Create( + env->GetFileSystem(), trace_filename, FileOptions(env_options), + &file_reader, nullptr); + if (!s.ok()) { + return s; + } + trace_reader->reset(new FileTraceReader(std::move(file_reader))); + return s; +} + +Status NewFileTraceWriter(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr<TraceWriter>* trace_writer) { + std::unique_ptr<WritableFileWriter> file_writer; + Status s = WritableFileWriter::Create(env->GetFileSystem(), trace_filename, + FileOptions(env_options), &file_writer, + nullptr); + if (!s.ok()) { + return s; + } + trace_writer->reset(new FileTraceWriter(std::move(file_writer))); + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/trace/file_trace_reader_writer.h b/src/rocksdb/utilities/trace/file_trace_reader_writer.h new file mode 100644 index 000000000..65d483108 --- /dev/null +++ b/src/rocksdb/utilities/trace/file_trace_reader_writer.h @@ -0,0 +1,48 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/trace_reader_writer.h" + +namespace ROCKSDB_NAMESPACE { + +class RandomAccessFileReader; +class WritableFileWriter; + +// FileTraceReader allows reading RocksDB traces from a file. +class FileTraceReader : public TraceReader { + public: + explicit FileTraceReader(std::unique_ptr<RandomAccessFileReader>&& reader); + ~FileTraceReader(); + + virtual Status Read(std::string* data) override; + virtual Status Close() override; + virtual Status Reset() override; + + private: + std::unique_ptr<RandomAccessFileReader> file_reader_; + Slice result_; + size_t offset_; + char* const buffer_; + + static const unsigned int kBufferSize; +}; + +// FileTraceWriter allows writing RocksDB traces to a file. +class FileTraceWriter : public TraceWriter { + public: + explicit FileTraceWriter(std::unique_ptr<WritableFileWriter>&& file_writer); + ~FileTraceWriter(); + + virtual Status Write(const Slice& data) override; + virtual Status Close() override; + virtual uint64_t GetFileSize() override; + + private: + std::unique_ptr<WritableFileWriter> file_writer_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/utilities/trace/replayer_impl.cc b/src/rocksdb/utilities/trace/replayer_impl.cc new file mode 100644 index 000000000..31023f1a2 --- /dev/null +++ b/src/rocksdb/utilities/trace/replayer_impl.cc @@ -0,0 +1,316 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/trace/replayer_impl.h" + +#include <cmath> +#include <thread> + +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/system_clock.h" +#include "util/threadpool_imp.h" + +namespace ROCKSDB_NAMESPACE { + +ReplayerImpl::ReplayerImpl(DB* db, + const std::vector<ColumnFamilyHandle*>& handles, + std::unique_ptr<TraceReader>&& reader) + : Replayer(), + trace_reader_(std::move(reader)), + prepared_(false), + trace_end_(false), + header_ts_(0), + exec_handler_(TraceRecord::NewExecutionHandler(db, handles)), + env_(db->GetEnv()), + trace_file_version_(-1) {} + +ReplayerImpl::~ReplayerImpl() { + exec_handler_.reset(); + trace_reader_.reset(); +} + +Status ReplayerImpl::Prepare() { + Trace header; + int db_version; + Status s = ReadHeader(&header); + if (!s.ok()) { + return s; + } + s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version); + if (!s.ok()) { + return s; + } + header_ts_ = header.ts; + prepared_ = true; + trace_end_ = false; + return Status::OK(); +} + +Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) { + if (!prepared_) { + return Status::Incomplete("Not prepared!"); + } + if (trace_end_) { + return Status::Incomplete("Trace end."); + } + + Trace trace; + Status s = ReadTrace(&trace); // ReadTrace is atomic + // Reached the trace end. + if (s.ok() && trace.type == kTraceEnd) { + trace_end_ = true; + return Status::Incomplete("Trace end."); + } + if (!s.ok() || record == nullptr) { + return s; + } + + return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record); +} + +Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record, + std::unique_ptr<TraceRecordResult>* result) { + return record->Accept(exec_handler_.get(), result); +} + +Status ReplayerImpl::Replay( + const ReplayOptions& options, + const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>& + result_callback) { + if (options.fast_forward <= 0.0) { + return Status::InvalidArgument("Wrong fast forward speed!"); + } + + if (!prepared_) { + return Status::Incomplete("Not prepared!"); + } + if (trace_end_) { + return Status::Incomplete("Trace end."); + } + + Status s = Status::OK(); + + if (options.num_threads <= 1) { + // num_threads == 0 or num_threads == 1 uses single thread. + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + + while (s.ok()) { + Trace trace; + s = ReadTrace(&trace); + // If already at trace end, ReadTrace should return Status::Incomplete(). + if (!s.ok()) { + break; + } + + // No need to sleep before breaking the loop if at the trace end. + if (trace.type == kTraceEnd) { + trace_end_ = true; + s = Status::Incomplete("Trace end."); + break; + } + + // In single-threaded replay, decode first then sleep. + std::unique_ptr<TraceRecord> record; + s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record); + if (!s.ok() && !s.IsNotSupported()) { + break; + } + + std::chrono::system_clock::time_point sleep_to = + replay_epoch + + std::chrono::microseconds(static_cast<uint64_t>(std::llround( + 1.0 * (trace.ts - header_ts_) / options.fast_forward))); + if (sleep_to > std::chrono::system_clock::now()) { + std::this_thread::sleep_until(sleep_to); + } + + // Skip unsupported traces, stop for other errors. + if (s.IsNotSupported()) { + if (result_callback != nullptr) { + result_callback(s, nullptr); + } + s = Status::OK(); + continue; + } + + if (result_callback == nullptr) { + s = Execute(record, nullptr); + } else { + std::unique_ptr<TraceRecordResult> res; + s = Execute(record, &res); + result_callback(s, std::move(res)); + } + } + } else { + // Multi-threaded replay. + ThreadPoolImpl thread_pool; + thread_pool.SetHostEnv(env_); + thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads)); + + std::mutex mtx; + // Background decoding and execution status. + Status bg_s = Status::OK(); + uint64_t last_err_ts = static_cast<uint64_t>(-1); + // Callback function used in background work to update bg_s for the ealiest + // TraceRecord which has execution error. This is different from the + // timestamp of the first execution error (either start or end timestamp). + // + // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution + // timestamps are T1_start, T1_end, T2_start, T2_end. + // Single-thread: there must be T1_start < T1_end < T2_start < T2_end. + // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are + // totally unknown. + // In order to report the same `first` error in both single-thread and + // multi-thread replay, we can only rely on the TraceRecords' timestamps, + // rather than their executin timestamps. Although in single-thread replay, + // the first error is also the last error, while in multi-thread replay, the + // first error may not be the first error in execution, and it may not be + // the last error in exeution as well. + auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) { + std::lock_guard<std::mutex> gd(mtx); + // Only record the first error. + if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) { + bg_s = err; + last_err_ts = err_ts; + } + }; + + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + + while (bg_s.ok() && s.ok()) { + Trace trace; + s = ReadTrace(&trace); + // If already at trace end, ReadTrace should return Status::Incomplete(). + if (!s.ok()) { + break; + } + + TraceType trace_type = trace.type; + + // No need to sleep before breaking the loop if at the trace end. + if (trace_type == kTraceEnd) { + trace_end_ = true; + s = Status::Incomplete("Trace end."); + break; + } + + // In multi-threaded replay, sleep first then start decoding and + // execution in a thread. + std::chrono::system_clock::time_point sleep_to = + replay_epoch + + std::chrono::microseconds(static_cast<uint64_t>(std::llround( + 1.0 * (trace.ts - header_ts_) / options.fast_forward))); + if (sleep_to > std::chrono::system_clock::now()) { + std::this_thread::sleep_until(sleep_to); + } + + if (trace_type == kTraceWrite || trace_type == kTraceGet || + trace_type == kTraceIteratorSeek || + trace_type == kTraceIteratorSeekForPrev || + trace_type == kTraceMultiGet) { + std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg); + ra->trace_entry = std::move(trace); + ra->handler = exec_handler_.get(); + ra->trace_file_version = trace_file_version_; + ra->error_cb = error_cb; + ra->result_cb = result_callback; + thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(), + nullptr, nullptr); + } else { + // Skip unsupported traces. + if (result_callback != nullptr) { + result_callback(Status::NotSupported("Unsupported trace type."), + nullptr); + } + } + } + + thread_pool.WaitForJobsAndJoinAllThreads(); + if (!bg_s.ok()) { + s = bg_s; + } + } + + if (s.IsIncomplete()) { + // Reaching eof returns Incomplete status at the moment. + // Could happen when killing a process without calling EndTrace() API. + // TODO: Add better error handling. + trace_end_ = true; + return Status::OK(); + } + return s; +} + +uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; } + +Status ReplayerImpl::ReadHeader(Trace* header) { + assert(header != nullptr); + Status s = trace_reader_->Reset(); + if (!s.ok()) { + return s; + } + std::string encoded_trace; + // Read the trace head + s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + + return TracerHelper::DecodeHeader(encoded_trace, header); +} + +Status ReplayerImpl::ReadTrace(Trace* trace) { + assert(trace != nullptr); + std::string encoded_trace; + // We don't know if TraceReader is implemented thread-safe, so we protect the + // reading trace part with a mutex. The decoding part does not need to be + // protected since it's local. + { + std::lock_guard<std::mutex> guard(mutex_); + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + } + return TracerHelper::DecodeTrace(encoded_trace, trace); +} + +void ReplayerImpl::BackgroundWork(void* arg) { + std::unique_ptr<ReplayerWorkerArg> ra( + reinterpret_cast<ReplayerWorkerArg*>(arg)); + assert(ra != nullptr); + + std::unique_ptr<TraceRecord> record; + Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry), + ra->trace_file_version, &record); + if (!s.ok()) { + // Stop the replay + if (ra->error_cb != nullptr) { + ra->error_cb(s, ra->trace_entry.ts); + } + // Report the result + if (ra->result_cb != nullptr) { + ra->result_cb(s, nullptr); + } + return; + } + + if (ra->result_cb == nullptr) { + s = record->Accept(ra->handler, nullptr); + } else { + std::unique_ptr<TraceRecordResult> res; + s = record->Accept(ra->handler, &res); + ra->result_cb(s, std::move(res)); + } + record.reset(); +} + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/trace/replayer_impl.h b/src/rocksdb/utilities/trace/replayer_impl.h new file mode 100644 index 000000000..367b0b51e --- /dev/null +++ b/src/rocksdb/utilities/trace/replayer_impl.h @@ -0,0 +1,86 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include <atomic> +#include <functional> +#include <memory> +#include <mutex> +#include <unordered_map> + +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record.h" +#include "rocksdb/trace_record_result.h" +#include "rocksdb/utilities/replayer.h" +#include "trace_replay/trace_replay.h" + +namespace ROCKSDB_NAMESPACE { + +class ReplayerImpl : public Replayer { + public: + ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles, + std::unique_ptr<TraceReader>&& reader); + ~ReplayerImpl() override; + + using Replayer::Prepare; + Status Prepare() override; + + using Replayer::Next; + Status Next(std::unique_ptr<TraceRecord>* record) override; + + using Replayer::Execute; + Status Execute(const std::unique_ptr<TraceRecord>& record, + std::unique_ptr<TraceRecordResult>* result) override; + + using Replayer::Replay; + Status Replay( + const ReplayOptions& options, + const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>& + result_callback) override; + + using Replayer::GetHeaderTimestamp; + uint64_t GetHeaderTimestamp() const override; + + private: + Status ReadHeader(Trace* header); + Status ReadTrace(Trace* trace); + + // Generic function to execute a Trace in a thread pool. + static void BackgroundWork(void* arg); + + std::unique_ptr<TraceReader> trace_reader_; + std::mutex mutex_; + std::atomic<bool> prepared_; + std::atomic<bool> trace_end_; + uint64_t header_ts_; + std::unique_ptr<TraceRecord::Handler> exec_handler_; + Env* env_; + // When reading the trace header, the trace file version can be parsed. + // Replayer will use different decode method to get the trace content based + // on different trace file version. + int trace_file_version_; +}; + +// Arguments passed to BackgroundWork() for replaying in a thread pool. +struct ReplayerWorkerArg { + Trace trace_entry; + int trace_file_version; + // Handler to execute TraceRecord. + TraceRecord::Handler* handler; + // Callback function to report the error status and the timestamp of the + // TraceRecord (not the start/end timestamp of executing the TraceRecord). + std::function<void(Status, uint64_t)> error_cb; + // Callback function to report the trace execution status and operation + // execution status/result(s). + std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb; +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE |