summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/trace_replay
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/trace_replay
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/trace_replay')
-rw-r--r--src/rocksdb/trace_replay/block_cache_tracer.cc504
-rw-r--r--src/rocksdb/trace_replay/block_cache_tracer.h239
-rw-r--r--src/rocksdb/trace_replay/block_cache_tracer_test.cc421
-rw-r--r--src/rocksdb/trace_replay/io_tracer.cc303
-rw-r--r--src/rocksdb/trace_replay/io_tracer.h185
-rw-r--r--src/rocksdb/trace_replay/io_tracer_test.cc353
-rw-r--r--src/rocksdb/trace_replay/trace_record.cc206
-rw-r--r--src/rocksdb/trace_replay/trace_record_handler.cc190
-rw-r--r--src/rocksdb/trace_replay/trace_record_handler.h46
-rw-r--r--src/rocksdb/trace_replay/trace_record_result.cc146
-rw-r--r--src/rocksdb/trace_replay/trace_replay.cc622
-rw-r--r--src/rocksdb/trace_replay/trace_replay.h183
12 files changed, 3398 insertions, 0 deletions
diff --git a/src/rocksdb/trace_replay/block_cache_tracer.cc b/src/rocksdb/trace_replay/block_cache_tracer.cc
new file mode 100644
index 000000000..508596913
--- /dev/null
+++ b/src/rocksdb/trace_replay/block_cache_tracer.cc
@@ -0,0 +1,504 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/block_cache_tracer.h"
+
+#include <cinttypes>
+#include <cstdio>
+#include <cstdlib>
+
+#include "db/db_impl/db_impl.h"
+#include "db/dbformat.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/trace_record.h"
+#include "util/coding.h"
+#include "util/hash.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+bool ShouldTrace(const Slice& block_key,
+ const BlockCacheTraceOptions& trace_options) {
+ if (trace_options.sampling_frequency == 0 ||
+ trace_options.sampling_frequency == 1) {
+ return true;
+ }
+ // We use spatial downsampling so that we have a complete access history for a
+ // block.
+ return 0 == GetSliceRangedNPHash(block_key, trace_options.sampling_frequency);
+}
+} // namespace
+
+const uint64_t kMicrosInSecond = 1000 * 1000;
+const uint64_t kSecondInMinute = 60;
+const uint64_t kSecondInHour = 3600;
+const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
+ "UnknownColumnFamily";
+const uint64_t BlockCacheTraceRecord::kReservedGetId = 0;
+const uint64_t BlockCacheTraceHelper::kReservedGetId = 0;
+
+bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
+ TraceType block_type, TableReaderCaller caller) {
+ return (block_type == TraceType::kBlockTraceDataBlock) &&
+ IsGetOrMultiGet(caller);
+}
+
+bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) {
+ return caller == TableReaderCaller::kUserGet ||
+ caller == TableReaderCaller::kUserMultiGet;
+}
+
+bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) {
+ return caller == TableReaderCaller::kUserGet ||
+ caller == TableReaderCaller::kUserMultiGet ||
+ caller == TableReaderCaller::kUserIterator ||
+ caller == TableReaderCaller::kUserApproximateSize ||
+ caller == TableReaderCaller::kUserVerifyChecksum;
+}
+
+std::string BlockCacheTraceHelper::ComputeRowKey(
+ const BlockCacheTraceRecord& access) {
+ if (!IsGetOrMultiGet(access.caller)) {
+ return "";
+ }
+ Slice key = ExtractUserKey(access.referenced_key);
+ return std::to_string(access.sst_fd_number) + "_" + key.ToString();
+}
+
+uint64_t BlockCacheTraceHelper::GetTableId(
+ const BlockCacheTraceRecord& access) {
+ if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) {
+ return 0;
+ }
+ return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1;
+}
+
+uint64_t BlockCacheTraceHelper::GetSequenceNumber(
+ const BlockCacheTraceRecord& access) {
+ if (!IsGetOrMultiGet(access.caller)) {
+ return 0;
+ }
+ return access.get_from_user_specified_snapshot
+ ? 1 + GetInternalKeySeqno(access.referenced_key)
+ : 0;
+}
+
+uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
+ const BlockCacheTraceRecord& access) {
+ Slice input(access.block_key);
+ uint64_t offset = 0;
+ while (true) {
+ uint64_t tmp = 0;
+ if (GetVarint64(&input, &tmp)) {
+ offset = tmp;
+ } else {
+ break;
+ }
+ }
+ return offset;
+}
+
+BlockCacheTraceWriterImpl::BlockCacheTraceWriterImpl(
+ SystemClock* clock, const BlockCacheTraceWriterOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer)
+ : clock_(clock),
+ trace_options_(trace_options),
+ trace_writer_(std::move(trace_writer)) {}
+
+Status BlockCacheTraceWriterImpl::WriteBlockAccess(
+ const BlockCacheTraceRecord& record, const Slice& block_key,
+ const Slice& cf_name, const Slice& referenced_key) {
+ uint64_t trace_file_size = trace_writer_->GetFileSize();
+ if (trace_file_size > trace_options_.max_trace_file_size) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = record.access_timestamp;
+ trace.type = record.block_type;
+ PutLengthPrefixedSlice(&trace.payload, block_key);
+ PutFixed64(&trace.payload, record.block_size);
+ PutFixed64(&trace.payload, record.cf_id);
+ PutLengthPrefixedSlice(&trace.payload, cf_name);
+ PutFixed32(&trace.payload, record.level);
+ PutFixed64(&trace.payload, record.sst_fd_number);
+ trace.payload.push_back(record.caller);
+ trace.payload.push_back(record.is_cache_hit);
+ trace.payload.push_back(record.no_insert);
+ if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) {
+ PutFixed64(&trace.payload, record.get_id);
+ trace.payload.push_back(record.get_from_user_specified_snapshot);
+ PutLengthPrefixedSlice(&trace.payload, referenced_key);
+ }
+ if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type,
+ record.caller)) {
+ PutFixed64(&trace.payload, record.referenced_data_size);
+ PutFixed64(&trace.payload, record.num_keys_in_block);
+ trace.payload.push_back(record.referenced_key_exist_in_block);
+ }
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(encoded_trace);
+}
+
+Status BlockCacheTraceWriterImpl::WriteHeader() {
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = TraceType::kTraceBegin;
+ PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
+ PutFixed32(&trace.payload, kMajorVersion);
+ PutFixed32(&trace.payload, kMinorVersion);
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(encoded_trace);
+}
+
+BlockCacheTraceReader::BlockCacheTraceReader(
+ std::unique_ptr<TraceReader>&& reader)
+ : trace_reader_(std::move(reader)) {}
+
+Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
+ assert(header != nullptr);
+ std::string encoded_trace;
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ Trace trace;
+ s = TracerHelper::DecodeTrace(encoded_trace, &trace);
+ if (!s.ok()) {
+ return s;
+ }
+ header->start_time = trace.ts;
+ Slice enc_slice = Slice(trace.payload);
+ Slice magnic_number;
+ if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read the magic number.");
+ }
+ if (magnic_number.ToString() != kTraceMagic) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Magic number does not match.");
+ }
+ if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read rocksdb major "
+ "version number.");
+ }
+ if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read rocksdb minor "
+ "version number.");
+ }
+ // We should have retrieved all information in the header.
+ if (!enc_slice.empty()) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: The length of header is too "
+ "long.");
+ }
+ return Status::OK();
+}
+
+Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
+ assert(record);
+ std::string encoded_trace;
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ Trace trace;
+ s = TracerHelper::DecodeTrace(encoded_trace, &trace);
+ if (!s.ok()) {
+ return s;
+ }
+ record->access_timestamp = trace.ts;
+ record->block_type = trace.type;
+ Slice enc_slice = Slice(trace.payload);
+
+ const unsigned int kCharSize = 1;
+
+ Slice block_key;
+ if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read block key.");
+ }
+ record->block_key = block_key.ToString();
+ if (!GetFixed64(&enc_slice, &record->block_size)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read block size.");
+ }
+ if (!GetFixed64(&enc_slice, &record->cf_id)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read column family ID.");
+ }
+ Slice cf_name;
+ if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read column family name.");
+ }
+ record->cf_name = cf_name.ToString();
+ if (!GetFixed32(&enc_slice, &record->level)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read level.");
+ }
+ if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read SST file number.");
+ }
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read caller.");
+ }
+ record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read is_cache_hit.");
+ }
+ record->is_cache_hit = static_cast<char>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read no_insert.");
+ }
+ record->no_insert = static_cast<char>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) {
+ if (!GetFixed64(&enc_slice, &record->get_id)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the get id.");
+ }
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read "
+ "get_from_user_specified_snapshot.");
+ }
+ record->get_from_user_specified_snapshot = static_cast<char>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ Slice referenced_key;
+ if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the referenced key.");
+ }
+ record->referenced_key = referenced_key.ToString();
+ }
+ if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type,
+ record->caller)) {
+ if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the referenced data size.");
+ }
+ if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the number of keys in the "
+ "block.");
+ }
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read "
+ "referenced_key_exist_in_block.");
+ }
+ record->referenced_key_exist_in_block = static_cast<char>(enc_slice[0]);
+ }
+ return Status::OK();
+}
+
+BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
+ if (human_readable_trace_file_writer_) {
+ human_readable_trace_file_writer_->Flush().PermitUncheckedError();
+ human_readable_trace_file_writer_->Close().PermitUncheckedError();
+ }
+}
+
+Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
+ const std::string& human_readable_trace_file_path,
+ ROCKSDB_NAMESPACE::Env* env) {
+ if (human_readable_trace_file_path.empty()) {
+ return Status::InvalidArgument(
+ "The provided human_readable_trace_file_path is null.");
+ }
+ return env->NewWritableFile(human_readable_trace_file_path,
+ &human_readable_trace_file_writer_, EnvOptions());
+}
+
+Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
+ const BlockCacheTraceRecord& access, uint64_t block_id,
+ uint64_t get_key_id) {
+ if (!human_readable_trace_file_writer_) {
+ return Status::OK();
+ }
+ int ret = snprintf(
+ trace_record_buffer_, sizeof(trace_record_buffer_),
+ "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
+ ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
+ ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
+ access.access_timestamp, block_id, access.block_type, access.block_size,
+ access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
+ access.caller, access.no_insert, access.get_id, get_key_id,
+ access.referenced_data_size, access.is_cache_hit,
+ access.referenced_key_exist_in_block, access.num_keys_in_block,
+ BlockCacheTraceHelper::GetTableId(access),
+ BlockCacheTraceHelper::GetSequenceNumber(access),
+ static_cast<uint64_t>(access.block_key.size()),
+ static_cast<uint64_t>(access.referenced_key.size()),
+ BlockCacheTraceHelper::GetBlockOffsetInFile(access));
+ if (ret < 0) {
+ return Status::IOError("failed to format the output");
+ }
+ std::string printout(trace_record_buffer_);
+ return human_readable_trace_file_writer_->Append(printout);
+}
+
+BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
+ const std::string& trace_file_path)
+ : BlockCacheTraceReader(/*trace_reader=*/nullptr) {
+ human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
+}
+
+BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
+ human_readable_trace_reader_.close();
+}
+
+Status BlockCacheHumanReadableTraceReader::ReadHeader(
+ BlockCacheTraceHeader* /*header*/) {
+ return Status::OK();
+}
+
+Status BlockCacheHumanReadableTraceReader::ReadAccess(
+ BlockCacheTraceRecord* record) {
+ std::string line;
+ if (!std::getline(human_readable_trace_reader_, line)) {
+ return Status::Incomplete("No more records to read.");
+ }
+ std::stringstream ss(line);
+ std::vector<std::string> record_strs;
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+ record_strs.push_back(substr);
+ }
+ if (record_strs.size() != 21) {
+ return Status::Incomplete("Records format is wrong.");
+ }
+
+ record->access_timestamp = ParseUint64(record_strs[0]);
+ uint64_t block_key = ParseUint64(record_strs[1]);
+ record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
+ record->block_size = ParseUint64(record_strs[3]);
+ record->cf_id = ParseUint64(record_strs[4]);
+ record->cf_name = record_strs[5];
+ record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
+ record->sst_fd_number = ParseUint64(record_strs[7]);
+ record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
+ record->no_insert = static_cast<char>(ParseUint64(record_strs[9]));
+ record->get_id = ParseUint64(record_strs[10]);
+ uint64_t get_key_id = ParseUint64(record_strs[11]);
+
+ record->referenced_data_size = ParseUint64(record_strs[12]);
+ record->is_cache_hit = static_cast<char>(ParseUint64(record_strs[13]));
+ record->referenced_key_exist_in_block =
+ static_cast<char>(ParseUint64(record_strs[14]));
+ record->num_keys_in_block = ParseUint64(record_strs[15]);
+ uint64_t table_id = ParseUint64(record_strs[16]);
+ if (table_id > 0) {
+ // Decrement since valid table id in the trace file equals traced table id
+ // + 1.
+ table_id -= 1;
+ }
+ uint64_t get_sequence_number = ParseUint64(record_strs[17]);
+ if (get_sequence_number > 0) {
+ record->get_from_user_specified_snapshot = true;
+ // Decrement since valid seq number in the trace file equals traced seq
+ // number + 1.
+ get_sequence_number -= 1;
+ }
+ uint64_t block_key_size = ParseUint64(record_strs[18]);
+ uint64_t get_key_size = ParseUint64(record_strs[19]);
+ uint64_t block_offset = ParseUint64(record_strs[20]);
+
+ std::string tmp_block_key;
+ PutVarint64(&tmp_block_key, block_key);
+ PutVarint64(&tmp_block_key, block_offset);
+ // Append 1 until the size is the same as traced block key size.
+ while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
+ record->block_key += "1";
+ }
+ record->block_key += tmp_block_key;
+
+ if (get_key_id != 0) {
+ std::string tmp_get_key;
+ PutFixed64(&tmp_get_key, get_key_id);
+ PutFixed64(&tmp_get_key, get_sequence_number << 8);
+ PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
+ // Append 1 until the size is the same as traced key size.
+ while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
+ record->referenced_key += "1";
+ }
+ record->referenced_key += tmp_get_key;
+ }
+ return Status::OK();
+}
+
+BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
+
+BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
+
+Status BlockCacheTracer::StartTrace(
+ const BlockCacheTraceOptions& trace_options,
+ std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) {
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (writer_.load()) {
+ return Status::Busy();
+ }
+ get_id_counter_.store(1);
+ trace_options_ = trace_options;
+ writer_.store(trace_writer.release());
+ return writer_.load()->WriteHeader();
+}
+
+void BlockCacheTracer::EndTrace() {
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (!writer_.load()) {
+ return;
+ }
+ delete writer_.load();
+ writer_.store(nullptr);
+}
+
+Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
+ const Slice& block_key,
+ const Slice& cf_name,
+ const Slice& referenced_key) {
+ if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
+ return Status::OK();
+ }
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (!writer_.load()) {
+ return Status::OK();
+ }
+ return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
+ referenced_key);
+}
+
+uint64_t BlockCacheTracer::NextGetId() {
+ if (!writer_.load(std::memory_order_relaxed)) {
+ return BlockCacheTraceHelper::kReservedGetId;
+ }
+ uint64_t prev_value = get_id_counter_.fetch_add(1);
+ if (prev_value == BlockCacheTraceHelper::kReservedGetId) {
+ // fetch and add again.
+ return get_id_counter_.fetch_add(1);
+ }
+ return prev_value;
+}
+
+std::unique_ptr<BlockCacheTraceWriter> NewBlockCacheTraceWriter(
+ SystemClock* clock, const BlockCacheTraceWriterOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer) {
+ return std::unique_ptr<BlockCacheTraceWriter>(new BlockCacheTraceWriterImpl(
+ clock, trace_options, std::move(trace_writer)));
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/block_cache_tracer.h b/src/rocksdb/trace_replay/block_cache_tracer.h
new file mode 100644
index 000000000..4a749608f
--- /dev/null
+++ b/src/rocksdb/trace_replay/block_cache_tracer.h
@@ -0,0 +1,239 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <atomic>
+#include <fstream>
+
+#include "monitoring/instrumented_mutex.h"
+#include "rocksdb/block_cache_trace_writer.h"
+#include "rocksdb/options.h"
+#include "rocksdb/table_reader_caller.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "trace_replay/trace_replay.h"
+
+namespace ROCKSDB_NAMESPACE {
+class Env;
+class SystemClock;
+
+extern const uint64_t kMicrosInSecond;
+extern const uint64_t kSecondInMinute;
+extern const uint64_t kSecondInHour;
+
+struct BlockCacheTraceRecord;
+
+class BlockCacheTraceHelper {
+ public:
+ static bool IsGetOrMultiGetOnDataBlock(TraceType block_type,
+ TableReaderCaller caller);
+ static bool IsGetOrMultiGet(TableReaderCaller caller);
+ static bool IsUserAccess(TableReaderCaller caller);
+ // Row key is a concatenation of the access's fd_number and the referenced
+ // user key.
+ static std::string ComputeRowKey(const BlockCacheTraceRecord& access);
+ // The first four bytes of the referenced key in a Get request is the table
+ // id.
+ static uint64_t GetTableId(const BlockCacheTraceRecord& access);
+ // The sequence number of a get request is the last part of the referenced
+ // key.
+ static uint64_t GetSequenceNumber(const BlockCacheTraceRecord& access);
+ // Block offset in a file is the last varint64 in the block key.
+ static uint64_t GetBlockOffsetInFile(const BlockCacheTraceRecord& access);
+
+ static const std::string kUnknownColumnFamilyName;
+ static const uint64_t kReservedGetId;
+};
+
+// Lookup context for tracing block cache accesses.
+// We trace block accesses at five places:
+// 1. BlockBasedTable::GetFilter
+// 2. BlockBasedTable::GetUncompressedDict.
+// 3. BlockBasedTable::MaybeReadAndLoadToCache. (To trace access on data, index,
+// and range deletion block.)
+// 4. BlockBasedTable::Get. (To trace the referenced key and whether the
+// referenced key exists in a fetched data block.)
+// 5. BlockBasedTable::MultiGet. (To trace the referenced key and whether the
+// referenced key exists in a fetched data block.)
+// The context is created at:
+// 1. BlockBasedTable::Get. (kUserGet)
+// 2. BlockBasedTable::MultiGet. (kUserMGet)
+// 3. BlockBasedTable::NewIterator. (either kUserIterator, kCompaction, or
+// external SST ingestion calls this function.)
+// 4. BlockBasedTable::Open. (kPrefetch)
+// 5. Index/Filter::CacheDependencies. (kPrefetch)
+// 6. BlockBasedTable::ApproximateOffsetOf. (kCompaction or
+// kUserApproximateSize).
+struct BlockCacheLookupContext {
+ BlockCacheLookupContext(const TableReaderCaller& _caller) : caller(_caller) {}
+ BlockCacheLookupContext(const TableReaderCaller& _caller, uint64_t _get_id,
+ bool _get_from_user_specified_snapshot)
+ : caller(_caller),
+ get_id(_get_id),
+ get_from_user_specified_snapshot(_get_from_user_specified_snapshot) {}
+ const TableReaderCaller caller;
+ // These are populated when we perform lookup/insert on block cache. The block
+ // cache tracer uses these inforation when logging the block access at
+ // BlockBasedTable::GET and BlockBasedTable::MultiGet.
+ bool is_cache_hit = false;
+ bool no_insert = false;
+ TraceType block_type = TraceType::kTraceMax;
+ uint64_t block_size = 0;
+ std::string block_key;
+ uint64_t num_keys_in_block = 0;
+ // The unique id associated with Get and MultiGet. This enables us to track
+ // how many blocks a Get/MultiGet request accesses. We can also measure the
+ // impact of row cache vs block cache.
+ uint64_t get_id = 0;
+ std::string referenced_key;
+ bool get_from_user_specified_snapshot = false;
+
+ void FillLookupContext(bool _is_cache_hit, bool _no_insert,
+ TraceType _block_type, uint64_t _block_size,
+ const std::string& _block_key,
+ uint64_t _num_keys_in_block) {
+ is_cache_hit = _is_cache_hit;
+ no_insert = _no_insert;
+ block_type = _block_type;
+ block_size = _block_size;
+ block_key = _block_key;
+ num_keys_in_block = _num_keys_in_block;
+ }
+};
+
+struct BlockCacheTraceHeader {
+ uint64_t start_time;
+ uint32_t rocksdb_major_version;
+ uint32_t rocksdb_minor_version;
+};
+
+// BlockCacheTraceWriter captures all RocksDB block cache accesses using a
+// user-provided TraceWriter. Every RocksDB operation is written as a single
+// trace. Each trace will have a timestamp and type, followed by the trace
+// payload.
+class BlockCacheTraceWriterImpl : public BlockCacheTraceWriter {
+ public:
+ BlockCacheTraceWriterImpl(SystemClock* clock,
+ const BlockCacheTraceWriterOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer);
+ ~BlockCacheTraceWriterImpl() = default;
+ // No copy and move.
+ BlockCacheTraceWriterImpl(const BlockCacheTraceWriterImpl&) = delete;
+ BlockCacheTraceWriterImpl& operator=(const BlockCacheTraceWriterImpl&) =
+ delete;
+ BlockCacheTraceWriterImpl(BlockCacheTraceWriterImpl&&) = delete;
+ BlockCacheTraceWriterImpl& operator=(BlockCacheTraceWriterImpl&&) = delete;
+
+ // Pass Slice references to avoid copy.
+ Status WriteBlockAccess(const BlockCacheTraceRecord& record,
+ const Slice& block_key, const Slice& cf_name,
+ const Slice& referenced_key);
+
+ // Write a trace header at the beginning, typically on initiating a trace,
+ // with some metadata like a magic number and RocksDB version.
+ Status WriteHeader();
+
+ private:
+ SystemClock* clock_;
+ BlockCacheTraceWriterOptions trace_options_;
+ std::unique_ptr<TraceWriter> trace_writer_;
+};
+
+// Write a trace record in human readable format, see
+// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format
+// for details.
+class BlockCacheHumanReadableTraceWriter {
+ public:
+ ~BlockCacheHumanReadableTraceWriter();
+
+ Status NewWritableFile(const std::string& human_readable_trace_file_path,
+ ROCKSDB_NAMESPACE::Env* env);
+
+ Status WriteHumanReadableTraceRecord(const BlockCacheTraceRecord& access,
+ uint64_t block_id, uint64_t get_key_id);
+
+ private:
+ char trace_record_buffer_[1024 * 1024];
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
+ human_readable_trace_file_writer_;
+};
+
+// BlockCacheTraceReader helps read the trace file generated by
+// BlockCacheTraceWriter using a user provided TraceReader.
+class BlockCacheTraceReader {
+ public:
+ BlockCacheTraceReader(std::unique_ptr<TraceReader>&& reader);
+ virtual ~BlockCacheTraceReader() = default;
+ // No copy and move.
+ BlockCacheTraceReader(const BlockCacheTraceReader&) = delete;
+ BlockCacheTraceReader& operator=(const BlockCacheTraceReader&) = delete;
+ BlockCacheTraceReader(BlockCacheTraceReader&&) = delete;
+ BlockCacheTraceReader& operator=(BlockCacheTraceReader&&) = delete;
+
+ Status ReadHeader(BlockCacheTraceHeader* header);
+
+ Status ReadAccess(BlockCacheTraceRecord* record);
+
+ private:
+ std::unique_ptr<TraceReader> trace_reader_;
+};
+
+// Read a trace record in human readable format, see
+// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format
+// for detailed.
+class BlockCacheHumanReadableTraceReader : public BlockCacheTraceReader {
+ public:
+ BlockCacheHumanReadableTraceReader(const std::string& trace_file_path);
+
+ ~BlockCacheHumanReadableTraceReader();
+
+ Status ReadHeader(BlockCacheTraceHeader* header);
+
+ Status ReadAccess(BlockCacheTraceRecord* record);
+
+ private:
+ std::ifstream human_readable_trace_reader_;
+};
+
+// A block cache tracer. It downsamples the accesses according to
+// trace_options and uses BlockCacheTraceWriter to write the access record to
+// the trace file.
+class BlockCacheTracer {
+ public:
+ BlockCacheTracer();
+ ~BlockCacheTracer();
+ // No copy and move.
+ BlockCacheTracer(const BlockCacheTracer&) = delete;
+ BlockCacheTracer& operator=(const BlockCacheTracer&) = delete;
+ BlockCacheTracer(BlockCacheTracer&&) = delete;
+ BlockCacheTracer& operator=(BlockCacheTracer&&) = delete;
+
+ // Start writing block cache accesses to the trace_writer.
+ Status StartTrace(const BlockCacheTraceOptions& trace_options,
+ std::unique_ptr<BlockCacheTraceWriter>&& trace_writer);
+
+ // Stop writing block cache accesses to the trace_writer.
+ void EndTrace();
+
+ bool is_tracing_enabled() const {
+ return writer_.load(std::memory_order_relaxed);
+ }
+
+ Status WriteBlockAccess(const BlockCacheTraceRecord& record,
+ const Slice& block_key, const Slice& cf_name,
+ const Slice& referenced_key);
+
+ // GetId cycles from 1 to std::numeric_limits<uint64_t>::max().
+ uint64_t NextGetId();
+
+ private:
+ BlockCacheTraceOptions trace_options_;
+ // A mutex protects the writer_.
+ InstrumentedMutex trace_writer_mutex_;
+ std::atomic<BlockCacheTraceWriter*> writer_;
+ std::atomic<uint64_t> get_id_counter_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/block_cache_tracer_test.cc b/src/rocksdb/trace_replay/block_cache_tracer_test.cc
new file mode 100644
index 000000000..f9d0773bf
--- /dev/null
+++ b/src/rocksdb/trace_replay/block_cache_tracer_test.cc
@@ -0,0 +1,421 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/block_cache_tracer.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+const uint64_t kBlockSize = 1024;
+const std::string kBlockKeyPrefix = "test-block-";
+const uint32_t kCFId = 0;
+const uint32_t kLevel = 1;
+const uint64_t kSSTFDNumber = 100;
+const std::string kRefKeyPrefix = "test-get-";
+const uint64_t kNumKeysInBlock = 1024;
+const uint64_t kReferencedDataSize = 10;
+} // namespace
+
+class BlockCacheTracerTest : public testing::Test {
+ public:
+ BlockCacheTracerTest() {
+ test_path_ = test::PerThreadDBPath("block_cache_tracer_test");
+ env_ = ROCKSDB_NAMESPACE::Env::Default();
+ clock_ = env_->GetSystemClock().get();
+ EXPECT_OK(env_->CreateDir(test_path_));
+ trace_file_path_ = test_path_ + "/block_cache_trace";
+ }
+
+ ~BlockCacheTracerTest() override {
+ EXPECT_OK(env_->DeleteFile(trace_file_path_));
+ EXPECT_OK(env_->DeleteDir(test_path_));
+ }
+
+ TableReaderCaller GetCaller(uint32_t key_id) {
+ uint32_t n = key_id % 5;
+ switch (n) {
+ case 0:
+ return TableReaderCaller::kPrefetch;
+ case 1:
+ return TableReaderCaller::kCompaction;
+ case 2:
+ return TableReaderCaller::kUserGet;
+ case 3:
+ return TableReaderCaller::kUserMultiGet;
+ case 4:
+ return TableReaderCaller::kUserIterator;
+ }
+ assert(false);
+ return TableReaderCaller::kMaxBlockCacheLookupCaller;
+ }
+
+ void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id,
+ TraceType block_type, uint32_t nblocks) {
+ assert(writer);
+ for (uint32_t i = 0; i < nblocks; i++) {
+ uint32_t key_id = from_key_id + i;
+ BlockCacheTraceRecord record;
+ record.block_type = block_type;
+ record.block_size = kBlockSize + key_id;
+ record.block_key = (kBlockKeyPrefix + std::to_string(key_id));
+ record.access_timestamp = clock_->NowMicros();
+ record.cf_id = kCFId;
+ record.cf_name = kDefaultColumnFamilyName;
+ record.caller = GetCaller(key_id);
+ record.level = kLevel;
+ record.sst_fd_number = kSSTFDNumber + key_id;
+ record.is_cache_hit = false;
+ record.no_insert = false;
+ // Provide get_id for all callers. The writer should only write get_id
+ // when the caller is either GET or MGET.
+ record.get_id = key_id + 1;
+ record.get_from_user_specified_snapshot = true;
+ // Provide these fields for all block types.
+ // The writer should only write these fields for data blocks and the
+ // caller is either GET or MGET.
+ record.referenced_key = (kRefKeyPrefix + std::to_string(key_id));
+ record.referenced_key_exist_in_block = true;
+ record.num_keys_in_block = kNumKeysInBlock;
+ record.referenced_data_size = kReferencedDataSize + key_id;
+ ASSERT_OK(writer->WriteBlockAccess(
+ record, record.block_key, record.cf_name, record.referenced_key));
+ }
+ }
+
+ BlockCacheTraceRecord GenerateAccessRecord() {
+ uint32_t key_id = 0;
+ BlockCacheTraceRecord record;
+ record.block_type = TraceType::kBlockTraceDataBlock;
+ record.block_size = kBlockSize;
+ record.block_key = kBlockKeyPrefix + std::to_string(key_id);
+ record.access_timestamp = clock_->NowMicros();
+ record.cf_id = kCFId;
+ record.cf_name = kDefaultColumnFamilyName;
+ record.caller = GetCaller(key_id);
+ record.level = kLevel;
+ record.sst_fd_number = kSSTFDNumber + key_id;
+ record.is_cache_hit = false;
+ record.no_insert = false;
+ record.referenced_key = kRefKeyPrefix + std::to_string(key_id);
+ record.referenced_key_exist_in_block = true;
+ record.num_keys_in_block = kNumKeysInBlock;
+ return record;
+ }
+
+ void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id,
+ TraceType block_type, uint32_t nblocks) {
+ assert(reader);
+ for (uint32_t i = 0; i < nblocks; i++) {
+ uint32_t key_id = from_key_id + i;
+ BlockCacheTraceRecord record;
+ ASSERT_OK(reader->ReadAccess(&record));
+ ASSERT_EQ(block_type, record.block_type);
+ ASSERT_EQ(kBlockSize + key_id, record.block_size);
+ ASSERT_EQ(kBlockKeyPrefix + std::to_string(key_id), record.block_key);
+ ASSERT_EQ(kCFId, record.cf_id);
+ ASSERT_EQ(kDefaultColumnFamilyName, record.cf_name);
+ ASSERT_EQ(GetCaller(key_id), record.caller);
+ ASSERT_EQ(kLevel, record.level);
+ ASSERT_EQ(kSSTFDNumber + key_id, record.sst_fd_number);
+ ASSERT_FALSE(record.is_cache_hit);
+ ASSERT_FALSE(record.no_insert);
+ if (record.caller == TableReaderCaller::kUserGet ||
+ record.caller == TableReaderCaller::kUserMultiGet) {
+ ASSERT_EQ(key_id + 1, record.get_id);
+ ASSERT_TRUE(record.get_from_user_specified_snapshot);
+ ASSERT_EQ(kRefKeyPrefix + std::to_string(key_id),
+ record.referenced_key);
+ } else {
+ ASSERT_EQ(BlockCacheTraceHelper::kReservedGetId, record.get_id);
+ ASSERT_FALSE(record.get_from_user_specified_snapshot);
+ ASSERT_EQ("", record.referenced_key);
+ }
+ if (block_type == TraceType::kBlockTraceDataBlock &&
+ (record.caller == TableReaderCaller::kUserGet ||
+ record.caller == TableReaderCaller::kUserMultiGet)) {
+ ASSERT_TRUE(record.referenced_key_exist_in_block);
+ ASSERT_EQ(kNumKeysInBlock, record.num_keys_in_block);
+ ASSERT_EQ(kReferencedDataSize + key_id, record.referenced_data_size);
+ continue;
+ }
+ ASSERT_FALSE(record.referenced_key_exist_in_block);
+ ASSERT_EQ(0, record.num_keys_in_block);
+ ASSERT_EQ(0, record.referenced_data_size);
+ }
+ }
+
+ Env* env_;
+ SystemClock* clock_;
+ EnvOptions env_options_;
+ std::string trace_file_path_;
+ std::string test_path_;
+};
+
+TEST_F(BlockCacheTracerTest, AtomicWriteBeforeStartTrace) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ {
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ BlockCacheTracer writer;
+ // The record should be written to the trace_file since StartTrace is not
+ // called.
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains nothing.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_NOK(reader.ReadHeader(&header));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, AtomicWrite) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ {
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ BlockCacheTraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ BlockCacheTracer writer;
+ ASSERT_OK(
+ writer.StartTrace(trace_opt, std::move(block_cache_trace_writer)));
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains one record.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
+ ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
+ VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1);
+ ASSERT_NOK(reader.ReadAccess(&record));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, ConsecutiveStartTrace) {
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ BlockCacheTraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(
+ NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer));
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ BlockCacheTracer writer;
+ ASSERT_OK(writer.StartTrace(trace_opt, std::move(block_cache_trace_writer)));
+ ASSERT_NOK(writer.StartTrace(trace_opt, std::move(block_cache_trace_writer)));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+}
+
+TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ {
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ BlockCacheTraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ BlockCacheTracer writer;
+ ASSERT_OK(
+ writer.StartTrace(trace_opt, std::move(block_cache_trace_writer)));
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ writer.EndTrace();
+ // Write the record again. This time the record should not be written since
+ // EndTrace is called.
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains one record.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
+ ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
+ VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1);
+ ASSERT_NOK(reader.ReadAccess(&record));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, NextGetId) {
+ BlockCacheTracer writer;
+ {
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ BlockCacheTraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ // next get id should always return 0 before we call StartTrace.
+ ASSERT_EQ(0, writer.NextGetId());
+ ASSERT_EQ(0, writer.NextGetId());
+ ASSERT_OK(
+ writer.StartTrace(trace_opt, std::move(block_cache_trace_writer)));
+ ASSERT_EQ(1, writer.NextGetId());
+ ASSERT_EQ(2, writer.NextGetId());
+ writer.EndTrace();
+ // next get id should return 0.
+ ASSERT_EQ(0, writer.NextGetId());
+ }
+
+ // Start trace again and next get id should return 1.
+ {
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ BlockCacheTraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ ASSERT_OK(
+ writer.StartTrace(trace_opt, std::move(block_cache_trace_writer)));
+ ASSERT_EQ(1, writer.NextGetId());
+ }
+}
+
+TEST_F(BlockCacheTracerTest, MixedBlocks) {
+ {
+ // Generate a trace file containing a mix of blocks.
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ ASSERT_OK(block_cache_trace_writer->WriteHeader());
+ // Write blocks of different types.
+ WriteBlockAccess(block_cache_trace_writer.get(), 0,
+ TraceType::kBlockTraceUncompressionDictBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 10,
+ TraceType::kBlockTraceDataBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 20,
+ TraceType::kBlockTraceFilterBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 30,
+ TraceType::kBlockTraceIndexBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 40,
+ TraceType::kBlockTraceRangeDeletionBlock, 10);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+
+ {
+ // Verify trace file is generated correctly.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
+ ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
+ // Read blocks.
+ VerifyAccess(&reader, 0, TraceType::kBlockTraceUncompressionDictBlock, 10);
+ VerifyAccess(&reader, 10, TraceType::kBlockTraceDataBlock, 10);
+ VerifyAccess(&reader, 20, TraceType::kBlockTraceFilterBlock, 10);
+ VerifyAccess(&reader, 30, TraceType::kBlockTraceIndexBlock, 10);
+ VerifyAccess(&reader, 40, TraceType::kBlockTraceRangeDeletionBlock, 10);
+ // Read one more record should report an error.
+ BlockCacheTraceRecord record;
+ ASSERT_NOK(reader.ReadAccess(&record));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, HumanReadableTrace) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ record.get_id = 1;
+ record.referenced_key = "";
+ record.caller = TableReaderCaller::kUserGet;
+ record.get_from_user_specified_snapshot = true;
+ record.referenced_data_size = kReferencedDataSize;
+ PutFixed32(&record.referenced_key, 111);
+ PutLengthPrefixedSlice(&record.referenced_key, "get_key");
+ PutFixed64(&record.referenced_key, 2 << 8);
+ PutLengthPrefixedSlice(&record.block_key, "block_key");
+ PutVarint64(&record.block_key, 333);
+ {
+ // Generate a human readable trace file.
+ BlockCacheHumanReadableTraceWriter writer;
+ ASSERT_OK(writer.NewWritableFile(trace_file_path_, env_));
+ ASSERT_OK(writer.WriteHumanReadableTraceRecord(record, 1, 1));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ BlockCacheHumanReadableTraceReader reader(trace_file_path_);
+ BlockCacheTraceHeader header;
+ BlockCacheTraceRecord read_record;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_OK(reader.ReadAccess(&read_record));
+ ASSERT_EQ(TraceType::kBlockTraceDataBlock, read_record.block_type);
+ ASSERT_EQ(kBlockSize, read_record.block_size);
+ ASSERT_EQ(kCFId, read_record.cf_id);
+ ASSERT_EQ(kDefaultColumnFamilyName, read_record.cf_name);
+ ASSERT_EQ(TableReaderCaller::kUserGet, read_record.caller);
+ ASSERT_EQ(kLevel, read_record.level);
+ ASSERT_EQ(kSSTFDNumber, read_record.sst_fd_number);
+ ASSERT_FALSE(read_record.is_cache_hit);
+ ASSERT_FALSE(read_record.no_insert);
+ ASSERT_EQ(1, read_record.get_id);
+ ASSERT_TRUE(read_record.get_from_user_specified_snapshot);
+ ASSERT_TRUE(read_record.referenced_key_exist_in_block);
+ ASSERT_EQ(kNumKeysInBlock, read_record.num_keys_in_block);
+ ASSERT_EQ(kReferencedDataSize, read_record.referenced_data_size);
+ ASSERT_EQ(record.block_key.size(), read_record.block_key.size());
+ ASSERT_EQ(record.referenced_key.size(), record.referenced_key.size());
+ ASSERT_EQ(112, BlockCacheTraceHelper::GetTableId(read_record));
+ ASSERT_EQ(3, BlockCacheTraceHelper::GetSequenceNumber(read_record));
+ ASSERT_EQ(333, BlockCacheTraceHelper::GetBlockOffsetInFile(read_record));
+ // Read again should fail.
+ ASSERT_NOK(reader.ReadAccess(&read_record));
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/trace_replay/io_tracer.cc b/src/rocksdb/trace_replay/io_tracer.cc
new file mode 100644
index 000000000..a860130f8
--- /dev/null
+++ b/src/rocksdb/trace_replay/io_tracer.cc
@@ -0,0 +1,303 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/io_tracer.h"
+
+#include <cinttypes>
+#include <cstdio>
+#include <cstdlib>
+
+#include "db/db_impl/db_impl.h"
+#include "db/dbformat.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "util/coding.h"
+#include "util/hash.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+IOTraceWriter::IOTraceWriter(SystemClock* clock,
+ const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer)
+ : clock_(clock),
+ trace_options_(trace_options),
+ trace_writer_(std::move(trace_writer)) {}
+
+Status IOTraceWriter::WriteIOOp(const IOTraceRecord& record,
+ IODebugContext* dbg) {
+ uint64_t trace_file_size = trace_writer_->GetFileSize();
+ if (trace_file_size > trace_options_.max_trace_file_size) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = record.access_timestamp;
+ trace.type = record.trace_type;
+ PutFixed64(&trace.payload, record.io_op_data);
+ Slice file_operation(record.file_operation);
+ PutLengthPrefixedSlice(&trace.payload, file_operation);
+ PutFixed64(&trace.payload, record.latency);
+ Slice io_status(record.io_status);
+ PutLengthPrefixedSlice(&trace.payload, io_status);
+ Slice file_name(record.file_name);
+ PutLengthPrefixedSlice(&trace.payload, file_name);
+
+ // Each bit in io_op_data stores which corresponding info from IOTraceOp will
+ // be added in the trace. Foreg, if bit at position 1 is set then
+ // IOTraceOp::kIOLen (length) will be logged in the record (Since
+ // IOTraceOp::kIOLen = 1 in the enum). So find all the set positions in
+ // io_op_data one by one and, update corresponsing info in the trace record,
+ // unset that bit to find other set bits until io_op_data = 0.
+ /* Write remaining options based on io_op_data set by file operation */
+ int64_t io_op_data = static_cast<int64_t>(record.io_op_data);
+ while (io_op_data) {
+ // Find the rightmost set bit.
+ uint32_t set_pos = static_cast<uint32_t>(log2(io_op_data & -io_op_data));
+ switch (set_pos) {
+ case IOTraceOp::kIOFileSize:
+ PutFixed64(&trace.payload, record.file_size);
+ break;
+ case IOTraceOp::kIOLen:
+ PutFixed64(&trace.payload, record.len);
+ break;
+ case IOTraceOp::kIOOffset:
+ PutFixed64(&trace.payload, record.offset);
+ break;
+ default:
+ assert(false);
+ }
+ // unset the rightmost bit.
+ io_op_data &= (io_op_data - 1);
+ }
+
+ int64_t trace_data = 0;
+ if (dbg) {
+ trace_data = static_cast<int64_t>(dbg->trace_data);
+ }
+ PutFixed64(&trace.payload, trace_data);
+ while (trace_data) {
+ // Find the rightmost set bit.
+ uint32_t set_pos = static_cast<uint32_t>(log2(trace_data & -trace_data));
+ switch (set_pos) {
+ case IODebugContext::TraceData::kRequestID: {
+ Slice request_id(dbg->request_id);
+ PutLengthPrefixedSlice(&trace.payload, request_id);
+ } break;
+ default:
+ assert(false);
+ }
+ // unset the rightmost bit.
+ trace_data &= (trace_data - 1);
+ }
+
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(encoded_trace);
+}
+
+Status IOTraceWriter::WriteHeader() {
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = TraceType::kTraceBegin;
+ PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
+ PutFixed32(&trace.payload, kMajorVersion);
+ PutFixed32(&trace.payload, kMinorVersion);
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(encoded_trace);
+}
+
+IOTraceReader::IOTraceReader(std::unique_ptr<TraceReader>&& reader)
+ : trace_reader_(std::move(reader)) {}
+
+Status IOTraceReader::ReadHeader(IOTraceHeader* header) {
+ assert(header != nullptr);
+ std::string encoded_trace;
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ Trace trace;
+ s = TracerHelper::DecodeTrace(encoded_trace, &trace);
+ if (!s.ok()) {
+ return s;
+ }
+ header->start_time = trace.ts;
+ Slice enc_slice = Slice(trace.payload);
+ Slice magic_number;
+ if (!GetLengthPrefixedSlice(&enc_slice, &magic_number)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read the magic number.");
+ }
+ if (magic_number.ToString() != kTraceMagic) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Magic number does not match.");
+ }
+ if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read rocksdb major "
+ "version number.");
+ }
+ if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read rocksdb minor "
+ "version number.");
+ }
+ // We should have retrieved all information in the header.
+ if (!enc_slice.empty()) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: The length of header is too "
+ "long.");
+ }
+ return Status::OK();
+}
+
+Status IOTraceReader::ReadIOOp(IOTraceRecord* record) {
+ assert(record);
+ std::string encoded_trace;
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ Trace trace;
+ s = TracerHelper::DecodeTrace(encoded_trace, &trace);
+ if (!s.ok()) {
+ return s;
+ }
+ record->access_timestamp = trace.ts;
+ record->trace_type = trace.type;
+ Slice enc_slice = Slice(trace.payload);
+
+ if (!GetFixed64(&enc_slice, &record->io_op_data)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read trace data.");
+ }
+ Slice file_operation;
+ if (!GetLengthPrefixedSlice(&enc_slice, &file_operation)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read file operation.");
+ }
+ record->file_operation = file_operation.ToString();
+ if (!GetFixed64(&enc_slice, &record->latency)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read latency.");
+ }
+ Slice io_status;
+ if (!GetLengthPrefixedSlice(&enc_slice, &io_status)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read IO status.");
+ }
+ record->io_status = io_status.ToString();
+ Slice file_name;
+ if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read file name.");
+ }
+ record->file_name = file_name.ToString();
+
+ // Each bit in io_op_data stores which corresponding info from IOTraceOp will
+ // be added in the trace. Foreg, if bit at position 1 is set then
+ // IOTraceOp::kIOLen (length) will be logged in the record (Since
+ // IOTraceOp::kIOLen = 1 in the enum). So find all the set positions in
+ // io_op_data one by one and, update corresponsing info in the trace record,
+ // unset that bit to find other set bits until io_op_data = 0.
+ /* Read remaining options based on io_op_data set by file operation */
+ // Assuming 63 bits will be used at max.
+ int64_t io_op_data = static_cast<int64_t>(record->io_op_data);
+ while (io_op_data) {
+ // Find the rightmost set bit.
+ uint32_t set_pos = static_cast<uint32_t>(log2(io_op_data & -io_op_data));
+ switch (set_pos) {
+ case IOTraceOp::kIOFileSize:
+ if (!GetFixed64(&enc_slice, &record->file_size)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read file size.");
+ }
+ break;
+ case IOTraceOp::kIOLen:
+ if (!GetFixed64(&enc_slice, &record->len)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read length.");
+ }
+ break;
+ case IOTraceOp::kIOOffset:
+ if (!GetFixed64(&enc_slice, &record->offset)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read offset.");
+ }
+ break;
+ default:
+ assert(false);
+ }
+ // unset the rightmost bit.
+ io_op_data &= (io_op_data - 1);
+ }
+
+ if (!GetFixed64(&enc_slice, &record->trace_data)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read trace op.");
+ }
+ int64_t trace_data = static_cast<int64_t>(record->trace_data);
+ while (trace_data) {
+ // Find the rightmost set bit.
+ uint32_t set_pos = static_cast<uint32_t>(log2(trace_data & -trace_data));
+ switch (set_pos) {
+ case IODebugContext::TraceData::kRequestID: {
+ Slice request_id;
+ if (!GetLengthPrefixedSlice(&enc_slice, &request_id)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to request id.");
+ }
+ record->request_id = request_id.ToString();
+ } break;
+ default:
+ assert(false);
+ }
+ // unset the rightmost bit.
+ trace_data &= (trace_data - 1);
+ }
+
+ return Status::OK();
+}
+
+IOTracer::IOTracer() : tracing_enabled(false) { writer_.store(nullptr); }
+
+IOTracer::~IOTracer() { EndIOTrace(); }
+
+Status IOTracer::StartIOTrace(SystemClock* clock,
+ const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer) {
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (writer_.load()) {
+ return Status::Busy();
+ }
+ trace_options_ = trace_options;
+ writer_.store(
+ new IOTraceWriter(clock, trace_options, std::move(trace_writer)));
+ tracing_enabled = true;
+ return writer_.load()->WriteHeader();
+}
+
+void IOTracer::EndIOTrace() {
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (!writer_.load()) {
+ return;
+ }
+ delete writer_.load();
+ writer_.store(nullptr);
+ tracing_enabled = false;
+}
+
+void IOTracer::WriteIOOp(const IOTraceRecord& record, IODebugContext* dbg) {
+ if (!writer_.load()) {
+ return;
+ }
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (!writer_.load()) {
+ return;
+ }
+ writer_.load()->WriteIOOp(record, dbg).PermitUncheckedError();
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/io_tracer.h b/src/rocksdb/trace_replay/io_tracer.h
new file mode 100644
index 000000000..3fc7cdba0
--- /dev/null
+++ b/src/rocksdb/trace_replay/io_tracer.h
@@ -0,0 +1,185 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <atomic>
+#include <fstream>
+
+#include "monitoring/instrumented_mutex.h"
+#include "port/lang.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/options.h"
+#include "rocksdb/trace_record.h"
+#include "trace_replay/trace_replay.h"
+
+namespace ROCKSDB_NAMESPACE {
+class SystemClock;
+class TraceReader;
+class TraceWriter;
+
+/* In order to log new data in trace record for specified operations, do
+ following:
+ 1. Add new data in IOTraceOP (say kIONewData= 3)
+ 2. Log it in IOTraceWriter::WriteIOOp, and read that in
+ IOTraceReader::ReadIOOp and
+ IOTraceRecordParser::PrintHumanReadableIOTraceRecord in the switch case.
+ 3. In the FileSystemTracer APIs where this data will be logged with, update
+ io_op_data |= (1 << IOTraceOp::kIONewData).
+*/
+enum IOTraceOp : char {
+ // The value of each enum represents the bitwise position for
+ // IOTraceRecord.io_op_data.
+ kIOFileSize = 0,
+ kIOLen = 1,
+ kIOOffset = 2,
+};
+
+struct IOTraceRecord {
+ // Required fields for all accesses.
+ uint64_t access_timestamp = 0;
+ TraceType trace_type = TraceType::kTraceMax;
+ // Each bit in io_op_data stores which corresponding info from IOTraceOp will
+ // be added in the trace. Foreg, if bit at position 1 is set then
+ // IOTraceOp::kIOLen (length) will be logged in the record.
+ uint64_t io_op_data = 0;
+ std::string file_operation;
+ uint64_t latency = 0;
+ std::string io_status;
+ // Stores file name instead of full path.
+ std::string file_name;
+
+ // Fields added to record based on IO operation.
+ uint64_t len = 0;
+ uint64_t offset = 0;
+ uint64_t file_size = 0;
+
+ // Additional information passed in IODebugContext.
+ uint64_t trace_data = 0;
+ std::string request_id;
+
+ IOTraceRecord() {}
+
+ IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type,
+ const uint64_t& _io_op_data, const std::string& _file_operation,
+ const uint64_t& _latency, const std::string& _io_status,
+ const std::string& _file_name, const uint64_t& _file_size = 0)
+ : access_timestamp(_access_timestamp),
+ trace_type(_trace_type),
+ io_op_data(_io_op_data),
+ file_operation(_file_operation),
+ latency(_latency),
+ io_status(_io_status),
+ file_name(_file_name),
+ file_size(_file_size) {}
+
+ IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type,
+ const uint64_t& _io_op_data, const std::string& _file_operation,
+ const uint64_t& _latency, const std::string& _io_status,
+ const std::string& _file_name, const uint64_t& _len,
+ const uint64_t& _offset)
+ : access_timestamp(_access_timestamp),
+ trace_type(_trace_type),
+ io_op_data(_io_op_data),
+ file_operation(_file_operation),
+ latency(_latency),
+ io_status(_io_status),
+ file_name(_file_name),
+ len(_len),
+ offset(_offset) {}
+};
+
+struct IOTraceHeader {
+ uint64_t start_time;
+ uint32_t rocksdb_major_version;
+ uint32_t rocksdb_minor_version;
+};
+
+// IOTraceWriter writes IO operation as a single trace. Each trace will have a
+// timestamp and type, followed by the trace payload.
+class IOTraceWriter {
+ public:
+ IOTraceWriter(SystemClock* clock, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer);
+ ~IOTraceWriter() = default;
+ // No copy and move.
+ IOTraceWriter(const IOTraceWriter&) = delete;
+ IOTraceWriter& operator=(const IOTraceWriter&) = delete;
+ IOTraceWriter(IOTraceWriter&&) = delete;
+ IOTraceWriter& operator=(IOTraceWriter&&) = delete;
+
+ Status WriteIOOp(const IOTraceRecord& record, IODebugContext* dbg);
+
+ // Write a trace header at the beginning, typically on initiating a trace,
+ // with some metadata like a magic number and RocksDB version.
+ Status WriteHeader();
+
+ private:
+ SystemClock* clock_;
+ TraceOptions trace_options_;
+ std::unique_ptr<TraceWriter> trace_writer_;
+};
+
+// IOTraceReader helps read the trace file generated by IOTraceWriter.
+class IOTraceReader {
+ public:
+ explicit IOTraceReader(std::unique_ptr<TraceReader>&& reader);
+ ~IOTraceReader() = default;
+ // No copy and move.
+ IOTraceReader(const IOTraceReader&) = delete;
+ IOTraceReader& operator=(const IOTraceReader&) = delete;
+ IOTraceReader(IOTraceReader&&) = delete;
+ IOTraceReader& operator=(IOTraceReader&&) = delete;
+
+ Status ReadHeader(IOTraceHeader* header);
+
+ Status ReadIOOp(IOTraceRecord* record);
+
+ private:
+ std::unique_ptr<TraceReader> trace_reader_;
+};
+
+// An IO tracer. It uses IOTraceWriter to write the access record to the
+// trace file.
+class IOTracer {
+ public:
+ IOTracer();
+ ~IOTracer();
+ // No copy and move.
+ IOTracer(const IOTracer&) = delete;
+ IOTracer& operator=(const IOTracer&) = delete;
+ IOTracer(IOTracer&&) = delete;
+ IOTracer& operator=(IOTracer&&) = delete;
+
+ // no_sanitize is added for tracing_enabled. writer_ is protected under mutex
+ // so even if user call Start/EndIOTrace and tracing_enabled is not updated in
+ // the meanwhile, WriteIOOp will anyways check the writer_ protected under
+ // mutex and ignore the operation if writer_is null. So its ok if
+ // tracing_enabled shows non updated value.
+
+ // Start writing IO operations to the trace_writer.
+ TSAN_SUPPRESSION Status
+ StartIOTrace(SystemClock* clock, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer);
+
+ // Stop writing IO operations to the trace_writer.
+ TSAN_SUPPRESSION void EndIOTrace();
+
+ TSAN_SUPPRESSION bool is_tracing_enabled() const { return tracing_enabled; }
+
+ void WriteIOOp(const IOTraceRecord& record, IODebugContext* dbg);
+
+ private:
+ TraceOptions trace_options_;
+ // A mutex protects the writer_.
+ InstrumentedMutex trace_writer_mutex_;
+ std::atomic<IOTraceWriter*> writer_;
+ // bool tracing_enabled is added to avoid costly operation of checking atomic
+ // variable 'writer_' is nullptr or not in is_tracing_enabled().
+ // is_tracing_enabled() is invoked multiple times by FileSystem classes.
+ bool tracing_enabled;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/io_tracer_test.cc b/src/rocksdb/trace_replay/io_tracer_test.cc
new file mode 100644
index 000000000..be3af4fb3
--- /dev/null
+++ b/src/rocksdb/trace_replay/io_tracer_test.cc
@@ -0,0 +1,353 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/io_tracer.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+const std::string kDummyFile = "/dummy/file";
+
+} // namespace
+
+class IOTracerTest : public testing::Test {
+ public:
+ IOTracerTest() {
+ test_path_ = test::PerThreadDBPath("io_tracer_test");
+ env_ = ROCKSDB_NAMESPACE::Env::Default();
+ clock_ = env_->GetSystemClock().get();
+ EXPECT_OK(env_->CreateDir(test_path_));
+ trace_file_path_ = test_path_ + "/io_trace";
+ }
+
+ ~IOTracerTest() override {
+ EXPECT_OK(env_->DeleteFile(trace_file_path_));
+ EXPECT_OK(env_->DeleteDir(test_path_));
+ }
+
+ std::string GetFileOperation(uint64_t id) {
+ id = id % 4;
+ switch (id) {
+ case 0:
+ return "CreateDir";
+ case 1:
+ return "GetChildren";
+ case 2:
+ return "FileSize";
+ case 3:
+ return "DeleteDir";
+ default:
+ assert(false);
+ }
+ return "";
+ }
+
+ void WriteIOOp(IOTraceWriter* writer, uint64_t nrecords) {
+ assert(writer);
+ for (uint64_t i = 0; i < nrecords; i++) {
+ IOTraceRecord record;
+ record.io_op_data = 0;
+ record.trace_type = TraceType::kIOTracer;
+ record.io_op_data |= (1 << IOTraceOp::kIOLen);
+ record.io_op_data |= (1 << IOTraceOp::kIOOffset);
+ record.file_operation = GetFileOperation(i);
+ record.io_status = IOStatus::OK().ToString();
+ record.file_name = kDummyFile + std::to_string(i);
+ record.len = i;
+ record.offset = i + 20;
+ EXPECT_OK(writer->WriteIOOp(record, nullptr));
+ }
+ }
+
+ void VerifyIOOp(IOTraceReader* reader, uint32_t nrecords) {
+ assert(reader);
+ for (uint32_t i = 0; i < nrecords; i++) {
+ IOTraceRecord record;
+ ASSERT_OK(reader->ReadIOOp(&record));
+ ASSERT_EQ(record.file_operation, GetFileOperation(i));
+ ASSERT_EQ(record.io_status, IOStatus::OK().ToString());
+ ASSERT_EQ(record.len, i);
+ ASSERT_EQ(record.offset, i + 20);
+ }
+ }
+
+ Env* env_;
+ SystemClock* clock_;
+ EnvOptions env_options_;
+ std::string trace_file_path_;
+ std::string test_path_;
+};
+
+TEST_F(IOTracerTest, MultipleRecordsWithDifferentIOOpOptions) {
+ std::string file_name = kDummyFile + std::to_string(5);
+ {
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ IOTracer writer;
+ ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer)));
+
+ // Write general record.
+ IOTraceRecord record0(0, TraceType::kIOTracer, 0 /*io_op_data*/,
+ GetFileOperation(0), 155 /*latency*/,
+ IOStatus::OK().ToString(), file_name);
+ writer.WriteIOOp(record0, nullptr);
+
+ // Write record with FileSize.
+ uint64_t io_op_data = 0;
+ io_op_data |= (1 << IOTraceOp::kIOFileSize);
+ IOTraceRecord record1(0, TraceType::kIOTracer, io_op_data,
+ GetFileOperation(1), 10 /*latency*/,
+ IOStatus::OK().ToString(), file_name,
+ 256 /*file_size*/);
+ writer.WriteIOOp(record1, nullptr);
+
+ // Write record with Length.
+ io_op_data = 0;
+ io_op_data |= (1 << IOTraceOp::kIOLen);
+ IOTraceRecord record2(0, TraceType::kIOTracer, io_op_data,
+ GetFileOperation(2), 10 /*latency*/,
+ IOStatus::OK().ToString(), file_name, 100 /*length*/,
+ 200 /*offset*/);
+ writer.WriteIOOp(record2, nullptr);
+
+ // Write record with Length and offset.
+ io_op_data = 0;
+ io_op_data |= (1 << IOTraceOp::kIOLen);
+ io_op_data |= (1 << IOTraceOp::kIOOffset);
+ IOTraceRecord record3(0, TraceType::kIOTracer, io_op_data,
+ GetFileOperation(3), 10 /*latency*/,
+ IOStatus::OK().ToString(), file_name, 120 /*length*/,
+ 17 /*offset*/);
+ writer.WriteIOOp(record3, nullptr);
+
+ // Write record with offset.
+ io_op_data = 0;
+ io_op_data |= (1 << IOTraceOp::kIOOffset);
+ IOTraceRecord record4(0, TraceType::kIOTracer, io_op_data,
+ GetFileOperation(4), 10 /*latency*/,
+ IOStatus::OK().ToString(), file_name, 13 /*length*/,
+ 50 /*offset*/);
+ writer.WriteIOOp(record4, nullptr);
+
+ // Write record with IODebugContext.
+ io_op_data = 0;
+ IODebugContext dbg;
+ dbg.SetRequestId("request_id_1");
+ IOTraceRecord record5(0, TraceType::kIOTracer, io_op_data,
+ GetFileOperation(5), 10 /*latency*/,
+ IOStatus::OK().ToString(), file_name);
+ writer.WriteIOOp(record5, &dbg);
+
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file is generated correctly.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ IOTraceReader reader(std::move(trace_reader));
+ IOTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
+ ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
+
+ // Read general record.
+ IOTraceRecord record0;
+ ASSERT_OK(reader.ReadIOOp(&record0));
+ ASSERT_EQ(record0.file_operation, GetFileOperation(0));
+ ASSERT_EQ(record0.latency, 155);
+ ASSERT_EQ(record0.file_name, file_name);
+
+ // Read record with FileSize.
+ IOTraceRecord record1;
+ ASSERT_OK(reader.ReadIOOp(&record1));
+ ASSERT_EQ(record1.file_size, 256);
+ ASSERT_EQ(record1.len, 0);
+ ASSERT_EQ(record1.offset, 0);
+
+ // Read record with Length.
+ IOTraceRecord record2;
+ ASSERT_OK(reader.ReadIOOp(&record2));
+ ASSERT_EQ(record2.len, 100);
+ ASSERT_EQ(record2.file_size, 0);
+ ASSERT_EQ(record2.offset, 0);
+
+ // Read record with Length and offset.
+ IOTraceRecord record3;
+ ASSERT_OK(reader.ReadIOOp(&record3));
+ ASSERT_EQ(record3.len, 120);
+ ASSERT_EQ(record3.file_size, 0);
+ ASSERT_EQ(record3.offset, 17);
+
+ // Read record with offset.
+ IOTraceRecord record4;
+ ASSERT_OK(reader.ReadIOOp(&record4));
+ ASSERT_EQ(record4.len, 0);
+ ASSERT_EQ(record4.file_size, 0);
+ ASSERT_EQ(record4.offset, 50);
+
+ IOTraceRecord record5;
+ ASSERT_OK(reader.ReadIOOp(&record5));
+ ASSERT_EQ(record5.len, 0);
+ ASSERT_EQ(record5.file_size, 0);
+ ASSERT_EQ(record5.offset, 0);
+ ASSERT_EQ(record5.request_id, "request_id_1");
+ // Read one more record and it should report error.
+ IOTraceRecord record6;
+ ASSERT_NOK(reader.ReadIOOp(&record6));
+ }
+}
+
+TEST_F(IOTracerTest, AtomicWrite) {
+ std::string file_name = kDummyFile + std::to_string(0);
+ {
+ IOTraceRecord record(0, TraceType::kIOTracer, 0 /*io_op_data*/,
+ GetFileOperation(0), 10 /*latency*/,
+ IOStatus::OK().ToString(), file_name);
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ IOTracer writer;
+ ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer)));
+ writer.WriteIOOp(record, nullptr);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains one record.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ IOTraceReader reader(std::move(trace_reader));
+ IOTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
+ ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
+ // Read record and verify data.
+ IOTraceRecord access_record;
+ ASSERT_OK(reader.ReadIOOp(&access_record));
+ ASSERT_EQ(access_record.file_operation, GetFileOperation(0));
+ ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString());
+ ASSERT_EQ(access_record.file_name, file_name);
+ ASSERT_NOK(reader.ReadIOOp(&access_record));
+ }
+}
+
+TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) {
+ std::string file_name = kDummyFile + std::to_string(0);
+ {
+ IOTraceRecord record(0, TraceType::kIOTracer, 0 /*io_op_data*/,
+ GetFileOperation(0), 0, IOStatus::OK().ToString(),
+ file_name);
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ IOTracer writer;
+ // The record should not be written to the trace_file since StartIOTrace is
+ // not called.
+ writer.WriteIOOp(record, nullptr);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains nothing.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ IOTraceReader reader(std::move(trace_reader));
+ IOTraceHeader header;
+ ASSERT_NOK(reader.ReadHeader(&header));
+ }
+}
+
+TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) {
+ std::string file_name = kDummyFile + std::to_string(0);
+ {
+ uint64_t io_op_data = 0;
+ io_op_data |= (1 << IOTraceOp::kIOFileSize);
+ IOTraceRecord record(
+ 0, TraceType::kIOTracer, io_op_data, GetFileOperation(2), 0 /*latency*/,
+ IOStatus::OK().ToString(), file_name, 10 /*file_size*/);
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ IOTracer writer;
+ ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer)));
+ writer.WriteIOOp(record, nullptr);
+ writer.EndIOTrace();
+ // Write the record again. This time the record should not be written since
+ // EndIOTrace is called.
+ writer.WriteIOOp(record, nullptr);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains one record.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ IOTraceReader reader(std::move(trace_reader));
+ IOTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
+ ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
+
+ IOTraceRecord access_record;
+ ASSERT_OK(reader.ReadIOOp(&access_record));
+ ASSERT_EQ(access_record.file_operation, GetFileOperation(2));
+ ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString());
+ ASSERT_EQ(access_record.file_size, 10);
+ // No more record.
+ ASSERT_NOK(reader.ReadIOOp(&access_record));
+ }
+}
+
+TEST_F(IOTracerTest, AtomicMultipleWrites) {
+ {
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ IOTraceWriter writer(clock_, trace_opt, std::move(trace_writer));
+ ASSERT_OK(writer.WriteHeader());
+ // Write 10 records
+ WriteIOOp(&writer, 10);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+
+ {
+ // Verify trace file is generated correctly.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ IOTraceReader reader(std::move(trace_reader));
+ IOTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
+ ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
+ // Read 10 records.
+ VerifyIOOp(&reader, 10);
+ // Read one more and record and it should report error.
+ IOTraceRecord record;
+ ASSERT_NOK(reader.ReadIOOp(&record));
+ }
+}
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/trace_replay/trace_record.cc b/src/rocksdb/trace_replay/trace_record.cc
new file mode 100644
index 000000000..21df0275d
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_record.cc
@@ -0,0 +1,206 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/trace_record.h"
+
+#include <utility>
+
+#include "rocksdb/db.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record_result.h"
+#include "trace_replay/trace_record_handler.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceRecord
+TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {}
+
+uint64_t TraceRecord::GetTimestamp() const { return timestamp_; }
+
+TraceRecord::Handler* TraceRecord::NewExecutionHandler(
+ DB* db, const std::vector<ColumnFamilyHandle*>& handles) {
+ return new TraceExecutionHandler(db, handles);
+}
+
+// QueryTraceRecord
+QueryTraceRecord::QueryTraceRecord(uint64_t timestamp)
+ : TraceRecord(timestamp) {}
+
+// WriteQueryTraceRecord
+WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp), rep_(std::move(write_batch_rep)) {}
+
+WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp) {
+ rep_.PinSelf(write_batch_rep);
+}
+
+WriteQueryTraceRecord::~WriteQueryTraceRecord() { rep_.clear(); }
+
+Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); }
+
+Status WriteQueryTraceRecord::Accept(
+ Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
+ assert(handler != nullptr);
+ return handler->Handle(*this, result);
+}
+
+// GetQueryTraceRecord
+GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
+ PinnableSlice&& key,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp),
+ cf_id_(column_family_id),
+ key_(std::move(key)) {}
+
+GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
+ const std::string& key,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp), cf_id_(column_family_id) {
+ key_.PinSelf(key);
+}
+
+GetQueryTraceRecord::~GetQueryTraceRecord() { key_.clear(); }
+
+uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; }
+
+Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); }
+
+Status GetQueryTraceRecord::Accept(Handler* handler,
+ std::unique_ptr<TraceRecordResult>* result) {
+ assert(handler != nullptr);
+ return handler->Handle(*this, result);
+}
+
+// IteratorQueryTraceRecord
+IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
+ : QueryTraceRecord(timestamp) {}
+
+IteratorQueryTraceRecord::IteratorQueryTraceRecord(PinnableSlice&& lower_bound,
+ PinnableSlice&& upper_bound,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp),
+ lower_(std::move(lower_bound)),
+ upper_(std::move(upper_bound)) {}
+
+IteratorQueryTraceRecord::IteratorQueryTraceRecord(
+ const std::string& lower_bound, const std::string& upper_bound,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp) {
+ lower_.PinSelf(lower_bound);
+ upper_.PinSelf(upper_bound);
+}
+
+IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
+
+Slice IteratorQueryTraceRecord::GetLowerBound() const { return Slice(lower_); }
+
+Slice IteratorQueryTraceRecord::GetUpperBound() const { return Slice(upper_); }
+
+// IteratorSeekQueryTraceRecord
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+ SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
+ uint64_t timestamp)
+ : IteratorQueryTraceRecord(timestamp),
+ type_(seek_type),
+ cf_id_(column_family_id),
+ key_(std::move(key)) {}
+
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+ SeekType seek_type, uint32_t column_family_id, const std::string& key,
+ uint64_t timestamp)
+ : IteratorQueryTraceRecord(timestamp),
+ type_(seek_type),
+ cf_id_(column_family_id) {
+ key_.PinSelf(key);
+}
+
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+ SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
+ PinnableSlice&& lower_bound, PinnableSlice&& upper_bound,
+ uint64_t timestamp)
+ : IteratorQueryTraceRecord(std::move(lower_bound), std::move(upper_bound),
+ timestamp),
+ type_(seek_type),
+ cf_id_(column_family_id),
+ key_(std::move(key)) {}
+
+IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
+ SeekType seek_type, uint32_t column_family_id, const std::string& key,
+ const std::string& lower_bound, const std::string& upper_bound,
+ uint64_t timestamp)
+ : IteratorQueryTraceRecord(lower_bound, upper_bound, timestamp),
+ type_(seek_type),
+ cf_id_(column_family_id) {
+ key_.PinSelf(key);
+}
+
+IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); }
+
+TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {
+ return static_cast<TraceType>(type_);
+}
+
+IteratorSeekQueryTraceRecord::SeekType
+IteratorSeekQueryTraceRecord::GetSeekType() const {
+ return type_;
+}
+
+uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const {
+ return cf_id_;
+}
+
+Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); }
+
+Status IteratorSeekQueryTraceRecord::Accept(
+ Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
+ assert(handler != nullptr);
+ return handler->Handle(*this, result);
+}
+
+// MultiGetQueryTraceRecord
+MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
+ std::vector<uint32_t> column_family_ids, std::vector<PinnableSlice>&& keys,
+ uint64_t timestamp)
+ : QueryTraceRecord(timestamp),
+ cf_ids_(column_family_ids),
+ keys_(std::move(keys)) {}
+
+MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
+ std::vector<uint32_t> column_family_ids,
+ const std::vector<std::string>& keys, uint64_t timestamp)
+ : QueryTraceRecord(timestamp), cf_ids_(column_family_ids) {
+ keys_.reserve(keys.size());
+ for (const std::string& key : keys) {
+ PinnableSlice ps;
+ ps.PinSelf(key);
+ keys_.push_back(std::move(ps));
+ }
+}
+
+MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {
+ cf_ids_.clear();
+ keys_.clear();
+}
+
+std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const {
+ return cf_ids_;
+}
+
+std::vector<Slice> MultiGetQueryTraceRecord::GetKeys() const {
+ return std::vector<Slice>(keys_.begin(), keys_.end());
+}
+
+Status MultiGetQueryTraceRecord::Accept(
+ Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
+ assert(handler != nullptr);
+ return handler->Handle(*this, result);
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/trace_record_handler.cc b/src/rocksdb/trace_replay/trace_record_handler.cc
new file mode 100644
index 000000000..ca179e870
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_record_handler.cc
@@ -0,0 +1,190 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/trace_record_handler.h"
+
+#include "rocksdb/iterator.h"
+#include "rocksdb/trace_record_result.h"
+#include "rocksdb/write_batch.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceExecutionHandler
+TraceExecutionHandler::TraceExecutionHandler(
+ DB* db, const std::vector<ColumnFamilyHandle*>& handles)
+ : TraceRecord::Handler(),
+ db_(db),
+ write_opts_(WriteOptions()),
+ read_opts_(ReadOptions()) {
+ assert(db != nullptr);
+ assert(!handles.empty());
+ cf_map_.reserve(handles.size());
+ for (ColumnFamilyHandle* handle : handles) {
+ assert(handle != nullptr);
+ cf_map_.insert({handle->GetID(), handle});
+ }
+ clock_ = db_->GetEnv()->GetSystemClock().get();
+}
+
+TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
+
+Status TraceExecutionHandler::Handle(
+ const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
+ uint64_t start = clock_->NowMicros();
+
+ WriteBatch batch(record.GetWriteBatchRep().ToString());
+ Status s = db_->Write(write_opts_, &batch);
+
+ uint64_t end = clock_->NowMicros();
+
+ if (s.ok() && result != nullptr) {
+ result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
+ record.GetTraceType()));
+ }
+
+ return s;
+}
+
+Status TraceExecutionHandler::Handle(
+ const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
+ auto it = cf_map_.find(record.GetColumnFamilyID());
+ if (it == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+
+ uint64_t start = clock_->NowMicros();
+
+ std::string value;
+ Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
+
+ uint64_t end = clock_->NowMicros();
+
+ // Treat not found as ok, return other errors.
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (result != nullptr) {
+ // Report the actual opetation status in TraceExecutionResult
+ result->reset(new SingleValueTraceExecutionResult(
+ std::move(s), std::move(value), start, end, record.GetTraceType()));
+ }
+ return Status::OK();
+}
+
+Status TraceExecutionHandler::Handle(
+ const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
+ auto it = cf_map_.find(record.GetColumnFamilyID());
+ if (it == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+
+ ReadOptions r_opts = read_opts_;
+ Slice lower = record.GetLowerBound();
+ if (!lower.empty()) {
+ r_opts.iterate_lower_bound = &lower;
+ }
+ Slice upper = record.GetUpperBound();
+ if (!upper.empty()) {
+ r_opts.iterate_upper_bound = &upper;
+ }
+ Iterator* single_iter = db_->NewIterator(r_opts, it->second);
+
+ uint64_t start = clock_->NowMicros();
+
+ switch (record.GetSeekType()) {
+ case IteratorSeekQueryTraceRecord::kSeekForPrev: {
+ single_iter->SeekForPrev(record.GetKey());
+ break;
+ }
+ default: {
+ single_iter->Seek(record.GetKey());
+ break;
+ }
+ }
+
+ uint64_t end = clock_->NowMicros();
+
+ Status s = single_iter->status();
+ if (s.ok() && result != nullptr) {
+ if (single_iter->Valid()) {
+ PinnableSlice ps_key;
+ ps_key.PinSelf(single_iter->key());
+ PinnableSlice ps_value;
+ ps_value.PinSelf(single_iter->value());
+ result->reset(new IteratorTraceExecutionResult(
+ true, s, std::move(ps_key), std::move(ps_value), start, end,
+ record.GetTraceType()));
+ } else {
+ result->reset(new IteratorTraceExecutionResult(
+ false, s, "", "", start, end, record.GetTraceType()));
+ }
+ }
+ delete single_iter;
+
+ return s;
+}
+
+Status TraceExecutionHandler::Handle(
+ const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) {
+ if (result != nullptr) {
+ result->reset(nullptr);
+ }
+ std::vector<ColumnFamilyHandle*> handles;
+ handles.reserve(record.GetColumnFamilyIDs().size());
+ for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
+ auto it = cf_map_.find(cf_id);
+ if (it == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+ handles.push_back(it->second);
+ }
+
+ std::vector<Slice> keys = record.GetKeys();
+
+ if (handles.empty() || keys.empty()) {
+ return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+ }
+ if (handles.size() != keys.size()) {
+ return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
+ }
+
+ uint64_t start = clock_->NowMicros();
+
+ std::vector<std::string> values;
+ std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
+
+ uint64_t end = clock_->NowMicros();
+
+ // Treat not found as ok, return other errors.
+ for (const Status& s : ss) {
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ }
+
+ if (result != nullptr) {
+ // Report the actual opetation status in TraceExecutionResult
+ result->reset(new MultiValuesTraceExecutionResult(
+ std::move(ss), std::move(values), start, end, record.GetTraceType()));
+ }
+
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/trace_record_handler.h b/src/rocksdb/trace_replay/trace_record_handler.h
new file mode 100644
index 000000000..88cf317dd
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_record_handler.h
@@ -0,0 +1,46 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/status.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_record.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Handler to execute TraceRecord.
+class TraceExecutionHandler : public TraceRecord::Handler {
+ public:
+ TraceExecutionHandler(DB* db,
+ const std::vector<ColumnFamilyHandle*>& handles);
+ virtual ~TraceExecutionHandler() override;
+
+ virtual Status Handle(const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ virtual Status Handle(const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ virtual Status Handle(const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+
+ private:
+ DB* db_;
+ std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
+ WriteOptions write_opts_;
+ ReadOptions read_opts_;
+ SystemClock* clock_;
+};
+
+// To do: Handler for trace_analyzer.
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/trace_record_result.cc b/src/rocksdb/trace_replay/trace_record_result.cc
new file mode 100644
index 000000000..9c0cb43ad
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_record_result.cc
@@ -0,0 +1,146 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/trace_record_result.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// TraceRecordResult
+TraceRecordResult::TraceRecordResult(TraceType trace_type)
+ : trace_type_(trace_type) {}
+
+TraceType TraceRecordResult::GetTraceType() const { return trace_type_; }
+
+// TraceExecutionResult
+TraceExecutionResult::TraceExecutionResult(uint64_t start_timestamp,
+ uint64_t end_timestamp,
+ TraceType trace_type)
+ : TraceRecordResult(trace_type),
+ ts_start_(start_timestamp),
+ ts_end_(end_timestamp) {
+ assert(ts_start_ <= ts_end_);
+}
+
+uint64_t TraceExecutionResult::GetStartTimestamp() const { return ts_start_; }
+
+uint64_t TraceExecutionResult::GetEndTimestamp() const { return ts_end_; }
+
+// StatusOnlyTraceExecutionResult
+StatusOnlyTraceExecutionResult::StatusOnlyTraceExecutionResult(
+ Status status, uint64_t start_timestamp, uint64_t end_timestamp,
+ TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ status_(std::move(status)) {}
+
+const Status& StatusOnlyTraceExecutionResult::GetStatus() const {
+ return status_;
+}
+
+Status StatusOnlyTraceExecutionResult::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// SingleValueTraceExecutionResult
+SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
+ Status status, const std::string& value, uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ status_(std::move(status)),
+ value_(value) {}
+
+SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
+ Status status, std::string&& value, uint64_t start_timestamp,
+ uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ status_(std::move(status)),
+ value_(std::move(value)) {}
+
+SingleValueTraceExecutionResult::~SingleValueTraceExecutionResult() {
+ value_.clear();
+}
+
+const Status& SingleValueTraceExecutionResult::GetStatus() const {
+ return status_;
+}
+
+const std::string& SingleValueTraceExecutionResult::GetValue() const {
+ return value_;
+}
+
+Status SingleValueTraceExecutionResult::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// MultiValuesTraceExecutionResult
+MultiValuesTraceExecutionResult::MultiValuesTraceExecutionResult(
+ std::vector<Status> multi_status, std::vector<std::string> values,
+ uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ multi_status_(std::move(multi_status)),
+ values_(std::move(values)) {}
+
+MultiValuesTraceExecutionResult::~MultiValuesTraceExecutionResult() {
+ multi_status_.clear();
+ values_.clear();
+}
+
+const std::vector<Status>& MultiValuesTraceExecutionResult::GetMultiStatus()
+ const {
+ return multi_status_;
+}
+
+const std::vector<std::string>& MultiValuesTraceExecutionResult::GetValues()
+ const {
+ return values_;
+}
+
+Status MultiValuesTraceExecutionResult::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+// IteratorTraceExecutionResult
+IteratorTraceExecutionResult::IteratorTraceExecutionResult(
+ bool valid, Status status, PinnableSlice&& key, PinnableSlice&& value,
+ uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ valid_(valid),
+ status_(std::move(status)),
+ key_(std::move(key)),
+ value_(std::move(value)) {}
+
+IteratorTraceExecutionResult::IteratorTraceExecutionResult(
+ bool valid, Status status, const std::string& key, const std::string& value,
+ uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
+ : TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
+ valid_(valid),
+ status_(std::move(status)) {
+ key_.PinSelf(key);
+ value_.PinSelf(value);
+}
+
+IteratorTraceExecutionResult::~IteratorTraceExecutionResult() {
+ key_.clear();
+ value_.clear();
+}
+
+bool IteratorTraceExecutionResult::GetValid() const { return valid_; }
+
+const Status& IteratorTraceExecutionResult::GetStatus() const {
+ return status_;
+}
+
+Slice IteratorTraceExecutionResult::GetKey() const { return Slice(key_); }
+
+Slice IteratorTraceExecutionResult::GetValue() const { return Slice(value_); }
+
+Status IteratorTraceExecutionResult::Accept(Handler* handler) {
+ assert(handler != nullptr);
+ return handler->Handle(*this);
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/trace_replay.cc b/src/rocksdb/trace_replay/trace_replay.cc
new file mode 100644
index 000000000..37b95852b
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_replay.cc
@@ -0,0 +1,622 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/trace_replay.h"
+
+#include <chrono>
+#include <sstream>
+#include <thread>
+
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/write_batch.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+const std::string kTraceMagic = "feedcafedeadbeef";
+
+namespace {
+void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
+ Slice buf(buffer);
+ GetFixed32(&buf, cf_id);
+ GetLengthPrefixedSlice(&buf, key);
+}
+} // namespace
+
+Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) {
+ if (v_string.find_first_of('.') == std::string::npos ||
+ v_string.find_first_of('.') != v_string.find_last_of('.')) {
+ return Status::Corruption(
+ "Corrupted trace file. Incorrect version format.");
+ }
+ int tmp_num = 0;
+ for (int i = 0; i < static_cast<int>(v_string.size()); i++) {
+ if (v_string[i] == '.') {
+ continue;
+ } else if (isdigit(v_string[i])) {
+ tmp_num = tmp_num * 10 + (v_string[i] - '0');
+ } else {
+ return Status::Corruption(
+ "Corrupted trace file. Incorrect version format");
+ }
+ }
+ *v_num = tmp_num;
+ return Status::OK();
+}
+
+Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version,
+ int* db_version) {
+ std::vector<std::string> s_vec;
+ int begin = 0, end;
+ for (int i = 0; i < 3; i++) {
+ assert(header.payload.find("\t", begin) != std::string::npos);
+ end = static_cast<int>(header.payload.find("\t", begin));
+ s_vec.push_back(header.payload.substr(begin, end - begin));
+ begin = end + 1;
+ }
+
+ std::string t_v_str, db_v_str;
+ assert(s_vec.size() == 3);
+ assert(s_vec[1].find("Trace Version: ") != std::string::npos);
+ t_v_str = s_vec[1].substr(15);
+ assert(s_vec[2].find("RocksDB Version: ") != std::string::npos);
+ db_v_str = s_vec[2].substr(17);
+
+ Status s;
+ s = ParseVersionStr(t_v_str, trace_version);
+ if (s != Status::OK()) {
+ return s;
+ }
+ s = ParseVersionStr(db_v_str, db_version);
+ return s;
+}
+
+void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
+ assert(encoded_trace);
+ PutFixed64(encoded_trace, trace.ts);
+ encoded_trace->push_back(trace.type);
+ PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
+ encoded_trace->append(trace.payload);
+}
+
+Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
+ Trace* trace) {
+ assert(trace != nullptr);
+ Slice enc_slice = Slice(encoded_trace);
+ if (!GetFixed64(&enc_slice, &trace->ts)) {
+ return Status::Incomplete("Decode trace string failed");
+ }
+ if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
+ return Status::Incomplete("Decode trace string failed");
+ }
+ trace->type = static_cast<TraceType>(enc_slice[0]);
+ enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
+ trace->payload = enc_slice.ToString();
+ return Status::OK();
+}
+
+Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
+ Trace* header) {
+ Status s = TracerHelper::DecodeTrace(encoded_trace, header);
+
+ if (header->type != kTraceBegin) {
+ return Status::Corruption("Corrupted trace file. Incorrect header.");
+ }
+ if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
+ return Status::Corruption("Corrupted trace file. Incorrect magic.");
+ }
+
+ return s;
+}
+
+bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
+ const TracePayloadType payload_type) {
+ uint64_t old_state = payload_map;
+ uint64_t tmp = 1;
+ payload_map |= (tmp << payload_type);
+ return old_state != payload_map;
+}
+
+Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record) {
+ assert(trace != nullptr);
+
+ if (record != nullptr) {
+ record->reset(nullptr);
+ }
+
+ switch (trace->type) {
+ // Write
+ case kTraceWrite: {
+ PinnableSlice rep;
+ if (trace_file_version < 2) {
+ rep.PinSelf(trace->payload);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ Slice write_batch_data;
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kWriteBatchData: {
+ GetLengthPrefixedSlice(&buf, &write_batch_data);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ rep.PinSelf(write_batch_data);
+ }
+
+ if (record != nullptr) {
+ record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
+ }
+
+ return Status::OK();
+ }
+ // Get
+ case kTraceGet: {
+ uint32_t cf_id = 0;
+ Slice get_key;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &get_key);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kGetCFID: {
+ GetFixed32(&buf, &cf_id);
+ break;
+ }
+ case TracePayloadType::kGetKey: {
+ GetLengthPrefixedSlice(&buf, &get_key);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ }
+
+ if (record != nullptr) {
+ PinnableSlice ps;
+ ps.PinSelf(get_key);
+ record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
+ }
+
+ return Status::OK();
+ }
+ // Iterator Seek and SeekForPrev
+ case kTraceIteratorSeek:
+ case kTraceIteratorSeekForPrev: {
+ uint32_t cf_id = 0;
+ Slice iter_key;
+ Slice lower_bound;
+ Slice upper_bound;
+
+ if (trace_file_version < 2) {
+ DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
+ } else {
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kIterCFID: {
+ GetFixed32(&buf, &cf_id);
+ break;
+ }
+ case TracePayloadType::kIterKey: {
+ GetLengthPrefixedSlice(&buf, &iter_key);
+ break;
+ }
+ case TracePayloadType::kIterLowerBound: {
+ GetLengthPrefixedSlice(&buf, &lower_bound);
+ break;
+ }
+ case TracePayloadType::kIterUpperBound: {
+ GetLengthPrefixedSlice(&buf, &upper_bound);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ }
+
+ if (record != nullptr) {
+ PinnableSlice ps_key;
+ ps_key.PinSelf(iter_key);
+ PinnableSlice ps_lower;
+ ps_lower.PinSelf(lower_bound);
+ PinnableSlice ps_upper;
+ ps_upper.PinSelf(upper_bound);
+ record->reset(new IteratorSeekQueryTraceRecord(
+ static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type),
+ cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper),
+ trace->ts));
+ }
+
+ return Status::OK();
+ }
+ // MultiGet
+ case kTraceMultiGet: {
+ if (trace_file_version < 2) {
+ return Status::Corruption("MultiGet is not supported.");
+ }
+
+ uint32_t multiget_size = 0;
+ std::vector<uint32_t> cf_ids;
+ std::vector<PinnableSlice> multiget_keys;
+
+ Slice cfids_payload;
+ Slice keys_payload;
+ Slice buf(trace->payload);
+ GetFixed64(&buf, &trace->payload_map);
+ int64_t payload_map = static_cast<int64_t>(trace->payload_map);
+ while (payload_map) {
+ // Find the rightmost set bit.
+ uint32_t set_pos =
+ static_cast<uint32_t>(log2(payload_map & -payload_map));
+ switch (set_pos) {
+ case TracePayloadType::kMultiGetSize: {
+ GetFixed32(&buf, &multiget_size);
+ break;
+ }
+ case TracePayloadType::kMultiGetCFIDs: {
+ GetLengthPrefixedSlice(&buf, &cfids_payload);
+ break;
+ }
+ case TracePayloadType::kMultiGetKeys: {
+ GetLengthPrefixedSlice(&buf, &keys_payload);
+ break;
+ }
+ default: {
+ assert(false);
+ }
+ }
+ // unset the rightmost bit.
+ payload_map &= (payload_map - 1);
+ }
+ if (multiget_size == 0) {
+ return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
+ }
+
+ // Decode the cfids_payload and keys_payload
+ cf_ids.reserve(multiget_size);
+ multiget_keys.reserve(multiget_size);
+ for (uint32_t i = 0; i < multiget_size; i++) {
+ uint32_t tmp_cfid;
+ Slice tmp_key;
+ GetFixed32(&cfids_payload, &tmp_cfid);
+ GetLengthPrefixedSlice(&keys_payload, &tmp_key);
+ cf_ids.push_back(tmp_cfid);
+ Slice s(tmp_key);
+ PinnableSlice ps;
+ ps.PinSelf(s);
+ multiget_keys.push_back(std::move(ps));
+ }
+
+ if (record != nullptr) {
+ record->reset(new MultiGetQueryTraceRecord(
+ std::move(cf_ids), std::move(multiget_keys), trace->ts));
+ }
+
+ return Status::OK();
+ }
+ default:
+ return Status::NotSupported("Unsupported trace type.");
+ }
+}
+
+Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer)
+ : clock_(clock),
+ trace_options_(trace_options),
+ trace_writer_(std::move(trace_writer)),
+ trace_request_count_(0) {
+ // TODO: What if this fails?
+ WriteHeader().PermitUncheckedError();
+}
+
+Tracer::~Tracer() { trace_writer_.reset(); }
+
+Status Tracer::Write(WriteBatch* write_batch) {
+ TraceType trace_type = kTraceWrite;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = trace_type;
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kWriteBatchData);
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data()));
+ return WriteTrace(trace);
+}
+
+Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
+ TraceType trace_type = kTraceGet;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = trace_type;
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID);
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey);
+ // Encode the Get struct members into payload. Make sure add them in order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, column_family->GetID());
+ PutLengthPrefixedSlice(&trace.payload, key);
+ return WriteTrace(trace);
+}
+
+Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key,
+ const Slice& lower_bound, const Slice upper_bound) {
+ TraceType trace_type = kTraceIteratorSeek;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = trace_type;
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
+ if (lower_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterLowerBound);
+ }
+ if (upper_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterUpperBound);
+ }
+ // Encode the Iterator struct members into payload. Make sure add them in
+ // order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, cf_id);
+ PutLengthPrefixedSlice(&trace.payload, key);
+ if (lower_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, lower_bound);
+ }
+ if (upper_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, upper_bound);
+ }
+ return WriteTrace(trace);
+}
+
+Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
+ const Slice& lower_bound,
+ const Slice upper_bound) {
+ TraceType trace_type = kTraceIteratorSeekForPrev;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = trace_type;
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID);
+ TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey);
+ if (lower_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterLowerBound);
+ }
+ if (upper_bound.size() > 0) {
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kIterUpperBound);
+ }
+ // Encode the Iterator struct members into payload. Make sure add them in
+ // order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, cf_id);
+ PutLengthPrefixedSlice(&trace.payload, key);
+ if (lower_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, lower_bound);
+ }
+ if (upper_bound.size() > 0) {
+ PutLengthPrefixedSlice(&trace.payload, upper_bound);
+ }
+ return WriteTrace(trace);
+}
+
+Status Tracer::MultiGet(const size_t num_keys,
+ ColumnFamilyHandle** column_families,
+ const Slice* keys) {
+ if (num_keys == 0) {
+ return Status::OK();
+ }
+ std::vector<ColumnFamilyHandle*> v_column_families;
+ std::vector<Slice> v_keys;
+ v_column_families.resize(num_keys);
+ v_keys.resize(num_keys);
+ for (size_t i = 0; i < num_keys; i++) {
+ v_column_families[i] = column_families[i];
+ v_keys[i] = keys[i];
+ }
+ return MultiGet(v_column_families, v_keys);
+}
+
+Status Tracer::MultiGet(const size_t num_keys,
+ ColumnFamilyHandle* column_family, const Slice* keys) {
+ if (num_keys == 0) {
+ return Status::OK();
+ }
+ std::vector<ColumnFamilyHandle*> column_families;
+ std::vector<Slice> v_keys;
+ column_families.resize(num_keys);
+ v_keys.resize(num_keys);
+ for (size_t i = 0; i < num_keys; i++) {
+ column_families[i] = column_family;
+ v_keys[i] = keys[i];
+ }
+ return MultiGet(column_families, v_keys);
+}
+
+Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families,
+ const std::vector<Slice>& keys) {
+ if (column_families.size() != keys.size()) {
+ return Status::Corruption("the CFs size and keys size does not match!");
+ }
+ TraceType trace_type = kTraceMultiGet;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ uint32_t multiget_size = static_cast<uint32_t>(keys.size());
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = trace_type;
+ // Set the payloadmap of the struct member that will be encoded in the
+ // payload.
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kMultiGetSize);
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kMultiGetCFIDs);
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kMultiGetKeys);
+ // Encode the CFIDs inorder
+ std::string cfids_payload;
+ std::string keys_payload;
+ for (uint32_t i = 0; i < multiget_size; i++) {
+ assert(i < column_families.size());
+ assert(i < keys.size());
+ PutFixed32(&cfids_payload, column_families[i]->GetID());
+ PutLengthPrefixedSlice(&keys_payload, keys[i]);
+ }
+ // Encode the Get struct members into payload. Make sure add them in order.
+ PutFixed64(&trace.payload, trace.payload_map);
+ PutFixed32(&trace.payload, multiget_size);
+ PutLengthPrefixedSlice(&trace.payload, cfids_payload);
+ PutLengthPrefixedSlice(&trace.payload, keys_payload);
+ return WriteTrace(trace);
+}
+
+bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
+ if (IsTraceFileOverMax()) {
+ return true;
+ }
+
+ TraceFilterType filter_mask = kTraceFilterNone;
+ switch (trace_type) {
+ case kTraceNone:
+ case kTraceBegin:
+ case kTraceEnd:
+ filter_mask = kTraceFilterNone;
+ break;
+ case kTraceWrite:
+ filter_mask = kTraceFilterWrite;
+ break;
+ case kTraceGet:
+ filter_mask = kTraceFilterGet;
+ break;
+ case kTraceIteratorSeek:
+ filter_mask = kTraceFilterIteratorSeek;
+ break;
+ case kTraceIteratorSeekForPrev:
+ filter_mask = kTraceFilterIteratorSeekForPrev;
+ break;
+ case kBlockTraceIndexBlock:
+ case kBlockTraceFilterBlock:
+ case kBlockTraceDataBlock:
+ case kBlockTraceUncompressionDictBlock:
+ case kBlockTraceRangeDeletionBlock:
+ case kIOTracer:
+ filter_mask = kTraceFilterNone;
+ break;
+ case kTraceMultiGet:
+ filter_mask = kTraceFilterMultiGet;
+ break;
+ case kTraceMax:
+ assert(false);
+ filter_mask = kTraceFilterNone;
+ break;
+ }
+ if (filter_mask != kTraceFilterNone && trace_options_.filter & filter_mask) {
+ return true;
+ }
+
+ ++trace_request_count_;
+ if (trace_request_count_ < trace_options_.sampling_frequency) {
+ return true;
+ }
+ trace_request_count_ = 0;
+ return false;
+}
+
+bool Tracer::IsTraceFileOverMax() {
+ uint64_t trace_file_size = trace_writer_->GetFileSize();
+ return (trace_file_size > trace_options_.max_trace_file_size);
+}
+
+Status Tracer::WriteHeader() {
+ std::ostringstream s;
+ s << kTraceMagic << "\t"
+ << "Trace Version: " << kTraceFileMajorVersion << "."
+ << kTraceFileMinorVersion << "\t"
+ << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
+ << "Format: Timestamp OpType Payload\n";
+ std::string header(s.str());
+
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = kTraceBegin;
+ trace.payload = header;
+ return WriteTrace(trace);
+}
+
+Status Tracer::WriteFooter() {
+ Trace trace;
+ trace.ts = clock_->NowMicros();
+ trace.type = kTraceEnd;
+ TracerHelper::SetPayloadMap(trace.payload_map,
+ TracePayloadType::kEmptyPayload);
+ trace.payload = "";
+ return WriteTrace(trace);
+}
+
+Status Tracer::WriteTrace(const Trace& trace) {
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(Slice(encoded_trace));
+}
+
+Status Tracer::Close() { return WriteFooter(); }
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/trace_replay.h b/src/rocksdb/trace_replay/trace_replay.h
new file mode 100644
index 000000000..9aba5ceb7
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_replay.h
@@ -0,0 +1,183 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <utility>
+
+#include "rocksdb/options.h"
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/utilities/replayer.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// This file contains Tracer and Replayer classes that enable capturing and
+// replaying RocksDB traces.
+
+class ColumnFamilyHandle;
+class ColumnFamilyData;
+class DB;
+class DBImpl;
+class Env;
+class Slice;
+class SystemClock;
+class TraceReader;
+class TraceWriter;
+class WriteBatch;
+
+struct ReadOptions;
+struct TraceOptions;
+struct WriteOptions;
+
+extern const std::string kTraceMagic;
+const unsigned int kTraceTimestampSize = 8;
+const unsigned int kTraceTypeSize = 1;
+const unsigned int kTracePayloadLengthSize = 4;
+const unsigned int kTraceMetadataSize =
+ kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize;
+
+static const int kTraceFileMajorVersion = 0;
+static const int kTraceFileMinorVersion = 2;
+
+// The data structure that defines a single trace.
+struct Trace {
+ uint64_t ts; // timestamp
+ TraceType type;
+ // Each bit in payload_map stores which corresponding struct member added in
+ // the payload. Each TraceType has its corresponding payload struct. For
+ // example, if bit at position 0 is set in write payload, then the write batch
+ // will be addedd.
+ uint64_t payload_map = 0;
+ // Each trace type has its own payload_struct, which will be serilized in the
+ // payload.
+ std::string payload;
+
+ void reset() {
+ ts = 0;
+ type = kTraceMax;
+ payload_map = 0;
+ payload.clear();
+ }
+};
+
+enum TracePayloadType : char {
+ // Each member of all query payload structs should have a corresponding flag
+ // here. Make sure to add them sequentially in the order of it is added.
+ kEmptyPayload = 0,
+ kWriteBatchData = 1,
+ kGetCFID = 2,
+ kGetKey = 3,
+ kIterCFID = 4,
+ kIterKey = 5,
+ kIterLowerBound = 6,
+ kIterUpperBound = 7,
+ kMultiGetSize = 8,
+ kMultiGetCFIDs = 9,
+ kMultiGetKeys = 10,
+};
+
+class TracerHelper {
+ public:
+ // Parse the string with major and minor version only
+ static Status ParseVersionStr(std::string& v_string, int* v_num);
+
+ // Parse the trace file version and db version in trace header
+ static Status ParseTraceHeader(const Trace& header, int* trace_version,
+ int* db_version);
+
+ // Encode a version 0.1 trace object into the given string.
+ static void EncodeTrace(const Trace& trace, std::string* encoded_trace);
+
+ // Decode a string into the given trace object.
+ static Status DecodeTrace(const std::string& encoded_trace, Trace* trace);
+
+ // Decode a string into the given trace header.
+ static Status DecodeHeader(const std::string& encoded_trace, Trace* header);
+
+ // Set the payload map based on the payload type
+ static bool SetPayloadMap(uint64_t& payload_map,
+ const TracePayloadType payload_type);
+
+ // Decode a Trace object into the corresponding TraceRecord.
+ // Return Status::OK() if nothing is wrong, record will be set accordingly.
+ // Return Status::NotSupported() if the trace type is not support, or the
+ // corresponding error status, record will be set to nullptr.
+ static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
+ std::unique_ptr<TraceRecord>* record);
+};
+
+// Tracer captures all RocksDB operations using a user-provided TraceWriter.
+// Every RocksDB operation is written as a single trace. Each trace will have a
+// timestamp and type, followed by the trace payload.
+class Tracer {
+ public:
+ Tracer(SystemClock* clock, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer);
+ ~Tracer();
+
+ // Trace all write operations -- Put, Merge, Delete, SingleDelete, Write
+ Status Write(WriteBatch* write_batch);
+
+ // Trace Get operations.
+ Status Get(ColumnFamilyHandle* cfname, const Slice& key);
+
+ // Trace Iterators.
+ Status IteratorSeek(const uint32_t& cf_id, const Slice& key,
+ const Slice& lower_bound, const Slice upper_bound);
+ Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
+ const Slice& lower_bound, const Slice upper_bound);
+
+ // Trace MultiGet
+
+ Status MultiGet(const size_t num_keys, ColumnFamilyHandle** column_families,
+ const Slice* keys);
+
+ Status MultiGet(const size_t num_keys, ColumnFamilyHandle* column_family,
+ const Slice* keys);
+
+ Status MultiGet(const std::vector<ColumnFamilyHandle*>& column_family,
+ const std::vector<Slice>& keys);
+
+ // Returns true if the trace is over the configured max trace file limit.
+ // False otherwise.
+ bool IsTraceFileOverMax();
+
+ // Returns true if the order of write trace records must match the order of
+ // the corresponding records logged to WAL and applied to the DB.
+ bool IsWriteOrderPreserved() { return trace_options_.preserve_write_order; }
+
+ // Writes a trace footer at the end of the tracing
+ Status Close();
+
+ private:
+ // Write a trace header at the beginning, typically on initiating a trace,
+ // with some metadata like a magic number, trace version, RocksDB version, and
+ // trace format.
+ Status WriteHeader();
+
+ // Write a trace footer, typically on ending a trace, with some metadata.
+ Status WriteFooter();
+
+ // Write a single trace using the provided TraceWriter to the underlying
+ // system, say, a filesystem or a streaming service.
+ Status WriteTrace(const Trace& trace);
+
+ // Helps in filtering and sampling of traces.
+ // Returns true if a trace should be skipped, false otherwise.
+ bool ShouldSkipTrace(const TraceType& type);
+
+ SystemClock* clock_;
+ TraceOptions trace_options_;
+ std::unique_ptr<TraceWriter> trace_writer_;
+ uint64_t trace_request_count_;
+};
+
+} // namespace ROCKSDB_NAMESPACE