summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/trace
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/trace/file_trace_reader_writer.cc133
-rw-r--r--src/rocksdb/utilities/trace/file_trace_reader_writer.h48
-rw-r--r--src/rocksdb/utilities/trace/replayer_impl.cc316
-rw-r--r--src/rocksdb/utilities/trace/replayer_impl.h86
4 files changed, 583 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/trace/file_trace_reader_writer.cc b/src/rocksdb/utilities/trace/file_trace_reader_writer.cc
new file mode 100644
index 000000000..5886d3539
--- /dev/null
+++ b/src/rocksdb/utilities/trace/file_trace_reader_writer.cc
@@ -0,0 +1,133 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "utilities/trace/file_trace_reader_writer.h"
+
+#include "env/composite_env_wrapper.h"
+#include "file/random_access_file_reader.h"
+#include "file/writable_file_writer.h"
+#include "trace_replay/trace_replay.h"
+#include "util/coding.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+const unsigned int FileTraceReader::kBufferSize = 1024; // 1KB
+
+FileTraceReader::FileTraceReader(
+ std::unique_ptr<RandomAccessFileReader>&& reader)
+ : file_reader_(std::move(reader)),
+ offset_(0),
+ buffer_(new char[kBufferSize]) {}
+
+FileTraceReader::~FileTraceReader() {
+ Close().PermitUncheckedError();
+ delete[] buffer_;
+}
+
+Status FileTraceReader::Close() {
+ file_reader_.reset();
+ return Status::OK();
+}
+
+Status FileTraceReader::Reset() {
+ if (file_reader_ == nullptr) {
+ return Status::IOError("TraceReader is closed.");
+ }
+ offset_ = 0;
+ return Status::OK();
+}
+
+Status FileTraceReader::Read(std::string* data) {
+ assert(file_reader_ != nullptr);
+ Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize,
+ &result_, buffer_, nullptr,
+ Env::IO_TOTAL /* rate_limiter_priority */);
+ if (!s.ok()) {
+ return s;
+ }
+ if (result_.size() == 0) {
+ // No more data to read
+ // Todo: Come up with a better way to indicate end of data. May be this
+ // could be avoided once footer is introduced.
+ return Status::Incomplete();
+ }
+ if (result_.size() < kTraceMetadataSize) {
+ return Status::Corruption("Corrupted trace file.");
+ }
+ *data = result_.ToString();
+ offset_ += kTraceMetadataSize;
+
+ uint32_t payload_len =
+ DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]);
+
+ // Read Payload
+ unsigned int bytes_to_read = payload_len;
+ unsigned int to_read =
+ bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
+ while (to_read > 0) {
+ s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_,
+ nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
+ if (!s.ok()) {
+ return s;
+ }
+ if (result_.size() < to_read) {
+ return Status::Corruption("Corrupted trace file.");
+ }
+ data->append(result_.data(), result_.size());
+
+ offset_ += to_read;
+ bytes_to_read -= to_read;
+ to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
+ }
+
+ return s;
+}
+
+FileTraceWriter::FileTraceWriter(
+ std::unique_ptr<WritableFileWriter>&& file_writer)
+ : file_writer_(std::move(file_writer)) {}
+
+FileTraceWriter::~FileTraceWriter() { Close().PermitUncheckedError(); }
+
+Status FileTraceWriter::Close() {
+ file_writer_.reset();
+ return Status::OK();
+}
+
+Status FileTraceWriter::Write(const Slice& data) {
+ return file_writer_->Append(data);
+}
+
+uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); }
+
+Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
+ const std::string& trace_filename,
+ std::unique_ptr<TraceReader>* trace_reader) {
+ std::unique_ptr<RandomAccessFileReader> file_reader;
+ Status s = RandomAccessFileReader::Create(
+ env->GetFileSystem(), trace_filename, FileOptions(env_options),
+ &file_reader, nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ trace_reader->reset(new FileTraceReader(std::move(file_reader)));
+ return s;
+}
+
+Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
+ const std::string& trace_filename,
+ std::unique_ptr<TraceWriter>* trace_writer) {
+ std::unique_ptr<WritableFileWriter> file_writer;
+ Status s = WritableFileWriter::Create(env->GetFileSystem(), trace_filename,
+ FileOptions(env_options), &file_writer,
+ nullptr);
+ if (!s.ok()) {
+ return s;
+ }
+ trace_writer->reset(new FileTraceWriter(std::move(file_writer)));
+ return s;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/trace/file_trace_reader_writer.h b/src/rocksdb/utilities/trace/file_trace_reader_writer.h
new file mode 100644
index 000000000..65d483108
--- /dev/null
+++ b/src/rocksdb/utilities/trace/file_trace_reader_writer.h
@@ -0,0 +1,48 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include "rocksdb/trace_reader_writer.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class RandomAccessFileReader;
+class WritableFileWriter;
+
+// FileTraceReader allows reading RocksDB traces from a file.
+class FileTraceReader : public TraceReader {
+ public:
+ explicit FileTraceReader(std::unique_ptr<RandomAccessFileReader>&& reader);
+ ~FileTraceReader();
+
+ virtual Status Read(std::string* data) override;
+ virtual Status Close() override;
+ virtual Status Reset() override;
+
+ private:
+ std::unique_ptr<RandomAccessFileReader> file_reader_;
+ Slice result_;
+ size_t offset_;
+ char* const buffer_;
+
+ static const unsigned int kBufferSize;
+};
+
+// FileTraceWriter allows writing RocksDB traces to a file.
+class FileTraceWriter : public TraceWriter {
+ public:
+ explicit FileTraceWriter(std::unique_ptr<WritableFileWriter>&& file_writer);
+ ~FileTraceWriter();
+
+ virtual Status Write(const Slice& data) override;
+ virtual Status Close() override;
+ virtual uint64_t GetFileSize() override;
+
+ private:
+ std::unique_ptr<WritableFileWriter> file_writer_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/utilities/trace/replayer_impl.cc b/src/rocksdb/utilities/trace/replayer_impl.cc
new file mode 100644
index 000000000..31023f1a2
--- /dev/null
+++ b/src/rocksdb/utilities/trace/replayer_impl.cc
@@ -0,0 +1,316 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/trace/replayer_impl.h"
+
+#include <cmath>
+#include <thread>
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/system_clock.h"
+#include "util/threadpool_imp.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+ReplayerImpl::ReplayerImpl(DB* db,
+ const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader)
+ : Replayer(),
+ trace_reader_(std::move(reader)),
+ prepared_(false),
+ trace_end_(false),
+ header_ts_(0),
+ exec_handler_(TraceRecord::NewExecutionHandler(db, handles)),
+ env_(db->GetEnv()),
+ trace_file_version_(-1) {}
+
+ReplayerImpl::~ReplayerImpl() {
+ exec_handler_.reset();
+ trace_reader_.reset();
+}
+
+Status ReplayerImpl::Prepare() {
+ Trace header;
+ int db_version;
+ Status s = ReadHeader(&header);
+ if (!s.ok()) {
+ return s;
+ }
+ s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
+ if (!s.ok()) {
+ return s;
+ }
+ header_ts_ = header.ts;
+ prepared_ = true;
+ trace_end_ = false;
+ return Status::OK();
+}
+
+Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
+ if (!prepared_) {
+ return Status::Incomplete("Not prepared!");
+ }
+ if (trace_end_) {
+ return Status::Incomplete("Trace end.");
+ }
+
+ Trace trace;
+ Status s = ReadTrace(&trace); // ReadTrace is atomic
+ // Reached the trace end.
+ if (s.ok() && trace.type == kTraceEnd) {
+ trace_end_ = true;
+ return Status::Incomplete("Trace end.");
+ }
+ if (!s.ok() || record == nullptr) {
+ return s;
+ }
+
+ return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record);
+}
+
+Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ return record->Accept(exec_handler_.get(), result);
+}
+
+Status ReplayerImpl::Replay(
+ const ReplayOptions& options,
+ const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+ result_callback) {
+ if (options.fast_forward <= 0.0) {
+ return Status::InvalidArgument("Wrong fast forward speed!");
+ }
+
+ if (!prepared_) {
+ return Status::Incomplete("Not prepared!");
+ }
+ if (trace_end_) {
+ return Status::Incomplete("Trace end.");
+ }
+
+ Status s = Status::OK();
+
+ if (options.num_threads <= 1) {
+ // num_threads == 0 or num_threads == 1 uses single thread.
+ std::chrono::system_clock::time_point replay_epoch =
+ std::chrono::system_clock::now();
+
+ while (s.ok()) {
+ Trace trace;
+ s = ReadTrace(&trace);
+ // If already at trace end, ReadTrace should return Status::Incomplete().
+ if (!s.ok()) {
+ break;
+ }
+
+ // No need to sleep before breaking the loop if at the trace end.
+ if (trace.type == kTraceEnd) {
+ trace_end_ = true;
+ s = Status::Incomplete("Trace end.");
+ break;
+ }
+
+ // In single-threaded replay, decode first then sleep.
+ std::unique_ptr<TraceRecord> record;
+ s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record);
+ if (!s.ok() && !s.IsNotSupported()) {
+ break;
+ }
+
+ std::chrono::system_clock::time_point sleep_to =
+ replay_epoch +
+ std::chrono::microseconds(static_cast<uint64_t>(std::llround(
+ 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
+ if (sleep_to > std::chrono::system_clock::now()) {
+ std::this_thread::sleep_until(sleep_to);
+ }
+
+ // Skip unsupported traces, stop for other errors.
+ if (s.IsNotSupported()) {
+ if (result_callback != nullptr) {
+ result_callback(s, nullptr);
+ }
+ s = Status::OK();
+ continue;
+ }
+
+ if (result_callback == nullptr) {
+ s = Execute(record, nullptr);
+ } else {
+ std::unique_ptr<TraceRecordResult> res;
+ s = Execute(record, &res);
+ result_callback(s, std::move(res));
+ }
+ }
+ } else {
+ // Multi-threaded replay.
+ ThreadPoolImpl thread_pool;
+ thread_pool.SetHostEnv(env_);
+ thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
+
+ std::mutex mtx;
+ // Background decoding and execution status.
+ Status bg_s = Status::OK();
+ uint64_t last_err_ts = static_cast<uint64_t>(-1);
+ // Callback function used in background work to update bg_s for the ealiest
+ // TraceRecord which has execution error. This is different from the
+ // timestamp of the first execution error (either start or end timestamp).
+ //
+ // Suppose TraceRecord R1, R2, with timestamps T1 < T2. Their execution
+ // timestamps are T1_start, T1_end, T2_start, T2_end.
+ // Single-thread: there must be T1_start < T1_end < T2_start < T2_end.
+ // Multi-thread: T1_start < T2_start may not be enforced. Orders of them are
+ // totally unknown.
+ // In order to report the same `first` error in both single-thread and
+ // multi-thread replay, we can only rely on the TraceRecords' timestamps,
+ // rather than their executin timestamps. Although in single-thread replay,
+ // the first error is also the last error, while in multi-thread replay, the
+ // first error may not be the first error in execution, and it may not be
+ // the last error in exeution as well.
+ auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
+ std::lock_guard<std::mutex> gd(mtx);
+ // Only record the first error.
+ if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
+ bg_s = err;
+ last_err_ts = err_ts;
+ }
+ };
+
+ std::chrono::system_clock::time_point replay_epoch =
+ std::chrono::system_clock::now();
+
+ while (bg_s.ok() && s.ok()) {
+ Trace trace;
+ s = ReadTrace(&trace);
+ // If already at trace end, ReadTrace should return Status::Incomplete().
+ if (!s.ok()) {
+ break;
+ }
+
+ TraceType trace_type = trace.type;
+
+ // No need to sleep before breaking the loop if at the trace end.
+ if (trace_type == kTraceEnd) {
+ trace_end_ = true;
+ s = Status::Incomplete("Trace end.");
+ break;
+ }
+
+ // In multi-threaded replay, sleep first then start decoding and
+ // execution in a thread.
+ std::chrono::system_clock::time_point sleep_to =
+ replay_epoch +
+ std::chrono::microseconds(static_cast<uint64_t>(std::llround(
+ 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
+ if (sleep_to > std::chrono::system_clock::now()) {
+ std::this_thread::sleep_until(sleep_to);
+ }
+
+ if (trace_type == kTraceWrite || trace_type == kTraceGet ||
+ trace_type == kTraceIteratorSeek ||
+ trace_type == kTraceIteratorSeekForPrev ||
+ trace_type == kTraceMultiGet) {
+ std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
+ ra->trace_entry = std::move(trace);
+ ra->handler = exec_handler_.get();
+ ra->trace_file_version = trace_file_version_;
+ ra->error_cb = error_cb;
+ ra->result_cb = result_callback;
+ thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
+ nullptr, nullptr);
+ } else {
+ // Skip unsupported traces.
+ if (result_callback != nullptr) {
+ result_callback(Status::NotSupported("Unsupported trace type."),
+ nullptr);
+ }
+ }
+ }
+
+ thread_pool.WaitForJobsAndJoinAllThreads();
+ if (!bg_s.ok()) {
+ s = bg_s;
+ }
+ }
+
+ if (s.IsIncomplete()) {
+ // Reaching eof returns Incomplete status at the moment.
+ // Could happen when killing a process without calling EndTrace() API.
+ // TODO: Add better error handling.
+ trace_end_ = true;
+ return Status::OK();
+ }
+ return s;
+}
+
+uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
+
+Status ReplayerImpl::ReadHeader(Trace* header) {
+ assert(header != nullptr);
+ Status s = trace_reader_->Reset();
+ if (!s.ok()) {
+ return s;
+ }
+ std::string encoded_trace;
+ // Read the trace head
+ s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+
+ return TracerHelper::DecodeHeader(encoded_trace, header);
+}
+
+Status ReplayerImpl::ReadTrace(Trace* trace) {
+ assert(trace != nullptr);
+ std::string encoded_trace;
+ // We don't know if TraceReader is implemented thread-safe, so we protect the
+ // reading trace part with a mutex. The decoding part does not need to be
+ // protected since it's local.
+ {
+ std::lock_guard<std::mutex> guard(mutex_);
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ return TracerHelper::DecodeTrace(encoded_trace, trace);
+}
+
+void ReplayerImpl::BackgroundWork(void* arg) {
+ std::unique_ptr<ReplayerWorkerArg> ra(
+ reinterpret_cast<ReplayerWorkerArg*>(arg));
+ assert(ra != nullptr);
+
+ std::unique_ptr<TraceRecord> record;
+ Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry),
+ ra->trace_file_version, &record);
+ if (!s.ok()) {
+ // Stop the replay
+ if (ra->error_cb != nullptr) {
+ ra->error_cb(s, ra->trace_entry.ts);
+ }
+ // Report the result
+ if (ra->result_cb != nullptr) {
+ ra->result_cb(s, nullptr);
+ }
+ return;
+ }
+
+ if (ra->result_cb == nullptr) {
+ s = record->Accept(ra->handler, nullptr);
+ } else {
+ std::unique_ptr<TraceRecordResult> res;
+ s = record->Accept(ra->handler, &res);
+ ra->result_cb(s, std::move(res));
+ }
+ record.reset();
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/trace/replayer_impl.h b/src/rocksdb/utilities/trace/replayer_impl.h
new file mode 100644
index 000000000..367b0b51e
--- /dev/null
+++ b/src/rocksdb/utilities/trace/replayer_impl.h
@@ -0,0 +1,86 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/trace_record_result.h"
+#include "rocksdb/utilities/replayer.h"
+#include "trace_replay/trace_replay.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ReplayerImpl : public Replayer {
+ public:
+ ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader);
+ ~ReplayerImpl() override;
+
+ using Replayer::Prepare;
+ Status Prepare() override;
+
+ using Replayer::Next;
+ Status Next(std::unique_ptr<TraceRecord>* record) override;
+
+ using Replayer::Execute;
+ Status Execute(const std::unique_ptr<TraceRecord>& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+
+ using Replayer::Replay;
+ Status Replay(
+ const ReplayOptions& options,
+ const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
+ result_callback) override;
+
+ using Replayer::GetHeaderTimestamp;
+ uint64_t GetHeaderTimestamp() const override;
+
+ private:
+ Status ReadHeader(Trace* header);
+ Status ReadTrace(Trace* trace);
+
+ // Generic function to execute a Trace in a thread pool.
+ static void BackgroundWork(void* arg);
+
+ std::unique_ptr<TraceReader> trace_reader_;
+ std::mutex mutex_;
+ std::atomic<bool> prepared_;
+ std::atomic<bool> trace_end_;
+ uint64_t header_ts_;
+ std::unique_ptr<TraceRecord::Handler> exec_handler_;
+ Env* env_;
+ // When reading the trace header, the trace file version can be parsed.
+ // Replayer will use different decode method to get the trace content based
+ // on different trace file version.
+ int trace_file_version_;
+};
+
+// Arguments passed to BackgroundWork() for replaying in a thread pool.
+struct ReplayerWorkerArg {
+ Trace trace_entry;
+ int trace_file_version;
+ // Handler to execute TraceRecord.
+ TraceRecord::Handler* handler;
+ // Callback function to report the error status and the timestamp of the
+ // TraceRecord (not the start/end timestamp of executing the TraceRecord).
+ std::function<void(Status, uint64_t)> error_cb;
+ // Callback function to report the trace execution status and operation
+ // execution status/result(s).
+ std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE