summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/trace_replay
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/trace_replay')
-rw-r--r--src/rocksdb/trace_replay/block_cache_tracer.cc497
-rw-r--r--src/rocksdb/trace_replay/block_cache_tracer.h294
-rw-r--r--src/rocksdb/trace_replay/block_cache_tracer_test.cc378
-rw-r--r--src/rocksdb/trace_replay/trace_replay.cc485
-rw-r--r--src/rocksdb/trace_replay/trace_replay.h189
5 files changed, 1843 insertions, 0 deletions
diff --git a/src/rocksdb/trace_replay/block_cache_tracer.cc b/src/rocksdb/trace_replay/block_cache_tracer.cc
new file mode 100644
index 000000000..9a96f1bac
--- /dev/null
+++ b/src/rocksdb/trace_replay/block_cache_tracer.cc
@@ -0,0 +1,497 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/block_cache_tracer.h"
+
+#include <cinttypes>
+#include <cstdio>
+#include <cstdlib>
+
+#include "db/db_impl/db_impl.h"
+#include "db/dbformat.h"
+#include "rocksdb/slice.h"
+#include "util/coding.h"
+#include "util/hash.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+const unsigned int kCharSize = 1;
+
+bool ShouldTrace(const Slice& block_key, const TraceOptions& trace_options) {
+ if (trace_options.sampling_frequency == 0 ||
+ trace_options.sampling_frequency == 1) {
+ return true;
+ }
+ // We use spatial downsampling so that we have a complete access history for a
+ // block.
+ return 0 == fastrange64(GetSliceNPHash64(block_key),
+ trace_options.sampling_frequency);
+}
+} // namespace
+
+const uint64_t kMicrosInSecond = 1000 * 1000;
+const uint64_t kSecondInMinute = 60;
+const uint64_t kSecondInHour = 3600;
+const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName =
+ "UnknownColumnFamily";
+const uint64_t BlockCacheTraceHelper::kReservedGetId = 0;
+
+bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(
+ TraceType block_type, TableReaderCaller caller) {
+ return (block_type == TraceType::kBlockTraceDataBlock) &&
+ IsGetOrMultiGet(caller);
+}
+
+bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) {
+ return caller == TableReaderCaller::kUserGet ||
+ caller == TableReaderCaller::kUserMultiGet;
+}
+
+bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) {
+ return caller == TableReaderCaller::kUserGet ||
+ caller == TableReaderCaller::kUserMultiGet ||
+ caller == TableReaderCaller::kUserIterator ||
+ caller == TableReaderCaller::kUserApproximateSize ||
+ caller == TableReaderCaller::kUserVerifyChecksum;
+}
+
+std::string BlockCacheTraceHelper::ComputeRowKey(
+ const BlockCacheTraceRecord& access) {
+ if (!IsGetOrMultiGet(access.caller)) {
+ return "";
+ }
+ Slice key = ExtractUserKey(access.referenced_key);
+ return std::to_string(access.sst_fd_number) + "_" + key.ToString();
+}
+
+uint64_t BlockCacheTraceHelper::GetTableId(
+ const BlockCacheTraceRecord& access) {
+ if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) {
+ return 0;
+ }
+ return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1;
+}
+
+uint64_t BlockCacheTraceHelper::GetSequenceNumber(
+ const BlockCacheTraceRecord& access) {
+ if (!IsGetOrMultiGet(access.caller)) {
+ return 0;
+ }
+ return access.get_from_user_specified_snapshot == Boolean::kFalse
+ ? 0
+ : 1 + GetInternalKeySeqno(access.referenced_key);
+}
+
+uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
+ const BlockCacheTraceRecord& access) {
+ Slice input(access.block_key);
+ uint64_t offset = 0;
+ while (true) {
+ uint64_t tmp = 0;
+ if (GetVarint64(&input, &tmp)) {
+ offset = tmp;
+ } else {
+ break;
+ }
+ }
+ return offset;
+}
+
+BlockCacheTraceWriter::BlockCacheTraceWriter(
+ Env* env, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer)
+ : env_(env),
+ trace_options_(trace_options),
+ trace_writer_(std::move(trace_writer)) {}
+
+Status BlockCacheTraceWriter::WriteBlockAccess(
+ const BlockCacheTraceRecord& record, const Slice& block_key,
+ const Slice& cf_name, const Slice& referenced_key) {
+ uint64_t trace_file_size = trace_writer_->GetFileSize();
+ if (trace_file_size > trace_options_.max_trace_file_size) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = record.access_timestamp;
+ trace.type = record.block_type;
+ PutLengthPrefixedSlice(&trace.payload, block_key);
+ PutFixed64(&trace.payload, record.block_size);
+ PutFixed64(&trace.payload, record.cf_id);
+ PutLengthPrefixedSlice(&trace.payload, cf_name);
+ PutFixed32(&trace.payload, record.level);
+ PutFixed64(&trace.payload, record.sst_fd_number);
+ trace.payload.push_back(record.caller);
+ trace.payload.push_back(record.is_cache_hit);
+ trace.payload.push_back(record.no_insert);
+ if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) {
+ PutFixed64(&trace.payload, record.get_id);
+ trace.payload.push_back(record.get_from_user_specified_snapshot);
+ PutLengthPrefixedSlice(&trace.payload, referenced_key);
+ }
+ if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type,
+ record.caller)) {
+ PutFixed64(&trace.payload, record.referenced_data_size);
+ PutFixed64(&trace.payload, record.num_keys_in_block);
+ trace.payload.push_back(record.referenced_key_exist_in_block);
+ }
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(encoded_trace);
+}
+
+Status BlockCacheTraceWriter::WriteHeader() {
+ Trace trace;
+ trace.ts = env_->NowMicros();
+ trace.type = TraceType::kTraceBegin;
+ PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
+ PutFixed32(&trace.payload, kMajorVersion);
+ PutFixed32(&trace.payload, kMinorVersion);
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(encoded_trace);
+}
+
+BlockCacheTraceReader::BlockCacheTraceReader(
+ std::unique_ptr<TraceReader>&& reader)
+ : trace_reader_(std::move(reader)) {}
+
+Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) {
+ assert(header != nullptr);
+ std::string encoded_trace;
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ Trace trace;
+ s = TracerHelper::DecodeTrace(encoded_trace, &trace);
+ if (!s.ok()) {
+ return s;
+ }
+ header->start_time = trace.ts;
+ Slice enc_slice = Slice(trace.payload);
+ Slice magnic_number;
+ if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read the magic number.");
+ }
+ if (magnic_number.ToString() != kTraceMagic) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Magic number does not match.");
+ }
+ if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read rocksdb major "
+ "version number.");
+ }
+ if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: Failed to read rocksdb minor "
+ "version number.");
+ }
+ // We should have retrieved all information in the header.
+ if (!enc_slice.empty()) {
+ return Status::Corruption(
+ "Corrupted header in the trace file: The length of header is too "
+ "long.");
+ }
+ return Status::OK();
+}
+
+Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
+ assert(record);
+ std::string encoded_trace;
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ Trace trace;
+ s = TracerHelper::DecodeTrace(encoded_trace, &trace);
+ if (!s.ok()) {
+ return s;
+ }
+ record->access_timestamp = trace.ts;
+ record->block_type = trace.type;
+ Slice enc_slice = Slice(trace.payload);
+
+ Slice block_key;
+ if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read block key.");
+ }
+ record->block_key = block_key.ToString();
+ if (!GetFixed64(&enc_slice, &record->block_size)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read block size.");
+ }
+ if (!GetFixed64(&enc_slice, &record->cf_id)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read column family ID.");
+ }
+ Slice cf_name;
+ if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read column family name.");
+ }
+ record->cf_name = cf_name.ToString();
+ if (!GetFixed32(&enc_slice, &record->level)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read level.");
+ }
+ if (!GetFixed64(&enc_slice, &record->sst_fd_number)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read SST file number.");
+ }
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read caller.");
+ }
+ record->caller = static_cast<TableReaderCaller>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read is_cache_hit.");
+ }
+ record->is_cache_hit = static_cast<Boolean>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read no_insert.");
+ }
+ record->no_insert = static_cast<Boolean>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) {
+ if (!GetFixed64(&enc_slice, &record->get_id)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the get id.");
+ }
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read "
+ "get_from_user_specified_snapshot.");
+ }
+ record->get_from_user_specified_snapshot =
+ static_cast<Boolean>(enc_slice[0]);
+ enc_slice.remove_prefix(kCharSize);
+ Slice referenced_key;
+ if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the referenced key.");
+ }
+ record->referenced_key = referenced_key.ToString();
+ }
+ if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type,
+ record->caller)) {
+ if (!GetFixed64(&enc_slice, &record->referenced_data_size)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the referenced data size.");
+ }
+ if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read the number of keys in the "
+ "block.");
+ }
+ if (enc_slice.empty()) {
+ return Status::Incomplete(
+ "Incomplete access record: Failed to read "
+ "referenced_key_exist_in_block.");
+ }
+ record->referenced_key_exist_in_block = static_cast<Boolean>(enc_slice[0]);
+ }
+ return Status::OK();
+}
+
+BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() {
+ if (human_readable_trace_file_writer_) {
+ human_readable_trace_file_writer_->Flush();
+ human_readable_trace_file_writer_->Close();
+ }
+}
+
+Status BlockCacheHumanReadableTraceWriter::NewWritableFile(
+ const std::string& human_readable_trace_file_path,
+ ROCKSDB_NAMESPACE::Env* env) {
+ if (human_readable_trace_file_path.empty()) {
+ return Status::InvalidArgument(
+ "The provided human_readable_trace_file_path is null.");
+ }
+ return env->NewWritableFile(human_readable_trace_file_path,
+ &human_readable_trace_file_writer_, EnvOptions());
+}
+
+Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord(
+ const BlockCacheTraceRecord& access, uint64_t block_id,
+ uint64_t get_key_id) {
+ if (!human_readable_trace_file_writer_) {
+ return Status::OK();
+ }
+ int ret = snprintf(
+ trace_record_buffer_, sizeof(trace_record_buffer_),
+ "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32
+ ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64
+ ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n",
+ access.access_timestamp, block_id, access.block_type, access.block_size,
+ access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number,
+ access.caller, access.no_insert, access.get_id, get_key_id,
+ access.referenced_data_size, access.is_cache_hit,
+ access.referenced_key_exist_in_block, access.num_keys_in_block,
+ BlockCacheTraceHelper::GetTableId(access),
+ BlockCacheTraceHelper::GetSequenceNumber(access),
+ static_cast<uint64_t>(access.block_key.size()),
+ static_cast<uint64_t>(access.referenced_key.size()),
+ BlockCacheTraceHelper::GetBlockOffsetInFile(access));
+ if (ret < 0) {
+ return Status::IOError("failed to format the output");
+ }
+ std::string printout(trace_record_buffer_);
+ return human_readable_trace_file_writer_->Append(printout);
+}
+
+BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader(
+ const std::string& trace_file_path)
+ : BlockCacheTraceReader(/*trace_reader=*/nullptr) {
+ human_readable_trace_reader_.open(trace_file_path, std::ifstream::in);
+}
+
+BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() {
+ human_readable_trace_reader_.close();
+}
+
+Status BlockCacheHumanReadableTraceReader::ReadHeader(
+ BlockCacheTraceHeader* /*header*/) {
+ return Status::OK();
+}
+
+Status BlockCacheHumanReadableTraceReader::ReadAccess(
+ BlockCacheTraceRecord* record) {
+ std::string line;
+ if (!std::getline(human_readable_trace_reader_, line)) {
+ return Status::Incomplete("No more records to read.");
+ }
+ std::stringstream ss(line);
+ std::vector<std::string> record_strs;
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+ record_strs.push_back(substr);
+ }
+ if (record_strs.size() != 21) {
+ return Status::Incomplete("Records format is wrong.");
+ }
+
+ record->access_timestamp = ParseUint64(record_strs[0]);
+ uint64_t block_key = ParseUint64(record_strs[1]);
+ record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2]));
+ record->block_size = ParseUint64(record_strs[3]);
+ record->cf_id = ParseUint64(record_strs[4]);
+ record->cf_name = record_strs[5];
+ record->level = static_cast<uint32_t>(ParseUint64(record_strs[6]));
+ record->sst_fd_number = ParseUint64(record_strs[7]);
+ record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8]));
+ record->no_insert = static_cast<Boolean>(ParseUint64(record_strs[9]));
+ record->get_id = ParseUint64(record_strs[10]);
+ uint64_t get_key_id = ParseUint64(record_strs[11]);
+
+ record->referenced_data_size = ParseUint64(record_strs[12]);
+ record->is_cache_hit = static_cast<Boolean>(ParseUint64(record_strs[13]));
+ record->referenced_key_exist_in_block =
+ static_cast<Boolean>(ParseUint64(record_strs[14]));
+ record->num_keys_in_block = ParseUint64(record_strs[15]);
+ uint64_t table_id = ParseUint64(record_strs[16]);
+ if (table_id > 0) {
+ // Decrement since valid table id in the trace file equals traced table id
+ // + 1.
+ table_id -= 1;
+ }
+ uint64_t get_sequence_number = ParseUint64(record_strs[17]);
+ if (get_sequence_number > 0) {
+ record->get_from_user_specified_snapshot = Boolean::kTrue;
+ // Decrement since valid seq number in the trace file equals traced seq
+ // number + 1.
+ get_sequence_number -= 1;
+ }
+ uint64_t block_key_size = ParseUint64(record_strs[18]);
+ uint64_t get_key_size = ParseUint64(record_strs[19]);
+ uint64_t block_offset = ParseUint64(record_strs[20]);
+
+ std::string tmp_block_key;
+ PutVarint64(&tmp_block_key, block_key);
+ PutVarint64(&tmp_block_key, block_offset);
+ // Append 1 until the size is the same as traced block key size.
+ while (record->block_key.size() < block_key_size - tmp_block_key.size()) {
+ record->block_key += "1";
+ }
+ record->block_key += tmp_block_key;
+
+ if (get_key_id != 0) {
+ std::string tmp_get_key;
+ PutFixed64(&tmp_get_key, get_key_id);
+ PutFixed64(&tmp_get_key, get_sequence_number << 8);
+ PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id));
+ // Append 1 until the size is the same as traced key size.
+ while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) {
+ record->referenced_key += "1";
+ }
+ record->referenced_key += tmp_get_key;
+ }
+ return Status::OK();
+}
+
+BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
+
+BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
+
+Status BlockCacheTracer::StartTrace(
+ Env* env, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer) {
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (writer_.load()) {
+ return Status::Busy();
+ }
+ get_id_counter_.store(1);
+ trace_options_ = trace_options;
+ writer_.store(
+ new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
+ return writer_.load()->WriteHeader();
+}
+
+void BlockCacheTracer::EndTrace() {
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (!writer_.load()) {
+ return;
+ }
+ delete writer_.load();
+ writer_.store(nullptr);
+}
+
+Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record,
+ const Slice& block_key,
+ const Slice& cf_name,
+ const Slice& referenced_key) {
+ if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) {
+ return Status::OK();
+ }
+ InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
+ if (!writer_.load()) {
+ return Status::OK();
+ }
+ return writer_.load()->WriteBlockAccess(record, block_key, cf_name,
+ referenced_key);
+}
+
+uint64_t BlockCacheTracer::NextGetId() {
+ if (!writer_.load(std::memory_order_relaxed)) {
+ return BlockCacheTraceHelper::kReservedGetId;
+ }
+ uint64_t prev_value = get_id_counter_.fetch_add(1);
+ if (prev_value == BlockCacheTraceHelper::kReservedGetId) {
+ // fetch and add again.
+ return get_id_counter_.fetch_add(1);
+ }
+ return prev_value;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/block_cache_tracer.h b/src/rocksdb/trace_replay/block_cache_tracer.h
new file mode 100644
index 000000000..5849273dc
--- /dev/null
+++ b/src/rocksdb/trace_replay/block_cache_tracer.h
@@ -0,0 +1,294 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <atomic>
+#include <fstream>
+
+#include "monitoring/instrumented_mutex.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "table/table_reader_caller.h"
+#include "trace_replay/trace_replay.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+extern const uint64_t kMicrosInSecond;
+extern const uint64_t kSecondInMinute;
+extern const uint64_t kSecondInHour;
+
+struct BlockCacheTraceRecord;
+
+class BlockCacheTraceHelper {
+ public:
+ static bool IsGetOrMultiGetOnDataBlock(TraceType block_type,
+ TableReaderCaller caller);
+ static bool IsGetOrMultiGet(TableReaderCaller caller);
+ static bool IsUserAccess(TableReaderCaller caller);
+ // Row key is a concatenation of the access's fd_number and the referenced
+ // user key.
+ static std::string ComputeRowKey(const BlockCacheTraceRecord& access);
+ // The first four bytes of the referenced key in a Get request is the table
+ // id.
+ static uint64_t GetTableId(const BlockCacheTraceRecord& access);
+ // The sequence number of a get request is the last part of the referenced
+ // key.
+ static uint64_t GetSequenceNumber(const BlockCacheTraceRecord& access);
+ // Block offset in a file is the last varint64 in the block key.
+ static uint64_t GetBlockOffsetInFile(const BlockCacheTraceRecord& access);
+
+ static const std::string kUnknownColumnFamilyName;
+ static const uint64_t kReservedGetId;
+};
+
+// Lookup context for tracing block cache accesses.
+// We trace block accesses at five places:
+// 1. BlockBasedTable::GetFilter
+// 2. BlockBasedTable::GetUncompressedDict.
+// 3. BlockBasedTable::MaybeReadAndLoadToCache. (To trace access on data, index,
+// and range deletion block.)
+// 4. BlockBasedTable::Get. (To trace the referenced key and whether the
+// referenced key exists in a fetched data block.)
+// 5. BlockBasedTable::MultiGet. (To trace the referenced key and whether the
+// referenced key exists in a fetched data block.)
+// The context is created at:
+// 1. BlockBasedTable::Get. (kUserGet)
+// 2. BlockBasedTable::MultiGet. (kUserMGet)
+// 3. BlockBasedTable::NewIterator. (either kUserIterator, kCompaction, or
+// external SST ingestion calls this function.)
+// 4. BlockBasedTable::Open. (kPrefetch)
+// 5. Index/Filter::CacheDependencies. (kPrefetch)
+// 6. BlockBasedTable::ApproximateOffsetOf. (kCompaction or
+// kUserApproximateSize).
+struct BlockCacheLookupContext {
+ BlockCacheLookupContext(const TableReaderCaller& _caller) : caller(_caller) {}
+ BlockCacheLookupContext(const TableReaderCaller& _caller, uint64_t _get_id,
+ bool _get_from_user_specified_snapshot)
+ : caller(_caller),
+ get_id(_get_id),
+ get_from_user_specified_snapshot(_get_from_user_specified_snapshot) {}
+ const TableReaderCaller caller;
+ // These are populated when we perform lookup/insert on block cache. The block
+ // cache tracer uses these inforation when logging the block access at
+ // BlockBasedTable::GET and BlockBasedTable::MultiGet.
+ bool is_cache_hit = false;
+ bool no_insert = false;
+ TraceType block_type = TraceType::kTraceMax;
+ uint64_t block_size = 0;
+ std::string block_key;
+ uint64_t num_keys_in_block = 0;
+ // The unique id associated with Get and MultiGet. This enables us to track
+ // how many blocks a Get/MultiGet request accesses. We can also measure the
+ // impact of row cache vs block cache.
+ uint64_t get_id = 0;
+ std::string referenced_key;
+ bool get_from_user_specified_snapshot = false;
+
+ void FillLookupContext(bool _is_cache_hit, bool _no_insert,
+ TraceType _block_type, uint64_t _block_size,
+ const std::string& _block_key,
+ uint64_t _num_keys_in_block) {
+ is_cache_hit = _is_cache_hit;
+ no_insert = _no_insert;
+ block_type = _block_type;
+ block_size = _block_size;
+ block_key = _block_key;
+ num_keys_in_block = _num_keys_in_block;
+ }
+};
+
+enum Boolean : char { kTrue = 1, kFalse = 0 };
+
+struct BlockCacheTraceRecord {
+ // Required fields for all accesses.
+ uint64_t access_timestamp = 0;
+ std::string block_key;
+ TraceType block_type = TraceType::kTraceMax;
+ uint64_t block_size = 0;
+ uint64_t cf_id = 0;
+ std::string cf_name;
+ uint32_t level = 0;
+ uint64_t sst_fd_number = 0;
+ TableReaderCaller caller = TableReaderCaller::kMaxBlockCacheLookupCaller;
+ Boolean is_cache_hit = Boolean::kFalse;
+ Boolean no_insert = Boolean::kFalse;
+ // Required field for Get and MultiGet
+ uint64_t get_id = BlockCacheTraceHelper::kReservedGetId;
+ Boolean get_from_user_specified_snapshot = Boolean::kFalse;
+ std::string referenced_key;
+ // Required fields for data block and user Get/Multi-Get only.
+ uint64_t referenced_data_size = 0;
+ uint64_t num_keys_in_block = 0;
+ Boolean referenced_key_exist_in_block = Boolean::kFalse;
+
+ BlockCacheTraceRecord() {}
+
+ BlockCacheTraceRecord(
+ uint64_t _access_timestamp, std::string _block_key, TraceType _block_type,
+ uint64_t _block_size, uint64_t _cf_id, std::string _cf_name,
+ uint32_t _level, uint64_t _sst_fd_number, TableReaderCaller _caller,
+ bool _is_cache_hit, bool _no_insert,
+ uint64_t _get_id = BlockCacheTraceHelper::kReservedGetId,
+ bool _get_from_user_specified_snapshot = false,
+ std::string _referenced_key = "", uint64_t _referenced_data_size = 0,
+ uint64_t _num_keys_in_block = 0,
+ bool _referenced_key_exist_in_block = false)
+ : access_timestamp(_access_timestamp),
+ block_key(_block_key),
+ block_type(_block_type),
+ block_size(_block_size),
+ cf_id(_cf_id),
+ cf_name(_cf_name),
+ level(_level),
+ sst_fd_number(_sst_fd_number),
+ caller(_caller),
+ is_cache_hit(_is_cache_hit ? Boolean::kTrue : Boolean::kFalse),
+ no_insert(_no_insert ? Boolean::kTrue : Boolean::kFalse),
+ get_id(_get_id),
+ get_from_user_specified_snapshot(_get_from_user_specified_snapshot
+ ? Boolean::kTrue
+ : Boolean::kFalse),
+ referenced_key(_referenced_key),
+ referenced_data_size(_referenced_data_size),
+ num_keys_in_block(_num_keys_in_block),
+ referenced_key_exist_in_block(
+ _referenced_key_exist_in_block ? Boolean::kTrue : Boolean::kFalse) {
+ }
+};
+
+struct BlockCacheTraceHeader {
+ uint64_t start_time;
+ uint32_t rocksdb_major_version;
+ uint32_t rocksdb_minor_version;
+};
+
+// BlockCacheTraceWriter captures all RocksDB block cache accesses using a
+// user-provided TraceWriter. Every RocksDB operation is written as a single
+// trace. Each trace will have a timestamp and type, followed by the trace
+// payload.
+class BlockCacheTraceWriter {
+ public:
+ BlockCacheTraceWriter(Env* env, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer);
+ ~BlockCacheTraceWriter() = default;
+ // No copy and move.
+ BlockCacheTraceWriter(const BlockCacheTraceWriter&) = delete;
+ BlockCacheTraceWriter& operator=(const BlockCacheTraceWriter&) = delete;
+ BlockCacheTraceWriter(BlockCacheTraceWriter&&) = delete;
+ BlockCacheTraceWriter& operator=(BlockCacheTraceWriter&&) = delete;
+
+ // Pass Slice references to avoid copy.
+ Status WriteBlockAccess(const BlockCacheTraceRecord& record,
+ const Slice& block_key, const Slice& cf_name,
+ const Slice& referenced_key);
+
+ // Write a trace header at the beginning, typically on initiating a trace,
+ // with some metadata like a magic number and RocksDB version.
+ Status WriteHeader();
+
+ private:
+ Env* env_;
+ TraceOptions trace_options_;
+ std::unique_ptr<TraceWriter> trace_writer_;
+};
+
+// Write a trace record in human readable format, see
+// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format
+// for details.
+class BlockCacheHumanReadableTraceWriter {
+ public:
+ ~BlockCacheHumanReadableTraceWriter();
+
+ Status NewWritableFile(const std::string& human_readable_trace_file_path,
+ ROCKSDB_NAMESPACE::Env* env);
+
+ Status WriteHumanReadableTraceRecord(const BlockCacheTraceRecord& access,
+ uint64_t block_id, uint64_t get_key_id);
+
+ private:
+ char trace_record_buffer_[1024 * 1024];
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
+ human_readable_trace_file_writer_;
+};
+
+// BlockCacheTraceReader helps read the trace file generated by
+// BlockCacheTraceWriter using a user provided TraceReader.
+class BlockCacheTraceReader {
+ public:
+ BlockCacheTraceReader(std::unique_ptr<TraceReader>&& reader);
+ ~BlockCacheTraceReader() = default;
+ // No copy and move.
+ BlockCacheTraceReader(const BlockCacheTraceReader&) = delete;
+ BlockCacheTraceReader& operator=(const BlockCacheTraceReader&) = delete;
+ BlockCacheTraceReader(BlockCacheTraceReader&&) = delete;
+ BlockCacheTraceReader& operator=(BlockCacheTraceReader&&) = delete;
+
+ Status ReadHeader(BlockCacheTraceHeader* header);
+
+ Status ReadAccess(BlockCacheTraceRecord* record);
+
+ private:
+ std::unique_ptr<TraceReader> trace_reader_;
+};
+
+// Read a trace record in human readable format, see
+// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format
+// for detailed.
+class BlockCacheHumanReadableTraceReader : public BlockCacheTraceReader {
+ public:
+ BlockCacheHumanReadableTraceReader(const std::string& trace_file_path);
+
+ ~BlockCacheHumanReadableTraceReader();
+
+ Status ReadHeader(BlockCacheTraceHeader* header);
+
+ Status ReadAccess(BlockCacheTraceRecord* record);
+
+ private:
+ std::ifstream human_readable_trace_reader_;
+};
+
+// A block cache tracer. It downsamples the accesses according to
+// trace_options and uses BlockCacheTraceWriter to write the access record to
+// the trace file.
+class BlockCacheTracer {
+ public:
+ BlockCacheTracer();
+ ~BlockCacheTracer();
+ // No copy and move.
+ BlockCacheTracer(const BlockCacheTracer&) = delete;
+ BlockCacheTracer& operator=(const BlockCacheTracer&) = delete;
+ BlockCacheTracer(BlockCacheTracer&&) = delete;
+ BlockCacheTracer& operator=(BlockCacheTracer&&) = delete;
+
+ // Start writing block cache accesses to the trace_writer.
+ Status StartTrace(Env* env, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer);
+
+ // Stop writing block cache accesses to the trace_writer.
+ void EndTrace();
+
+ bool is_tracing_enabled() const {
+ return writer_.load(std::memory_order_relaxed);
+ }
+
+ Status WriteBlockAccess(const BlockCacheTraceRecord& record,
+ const Slice& block_key, const Slice& cf_name,
+ const Slice& referenced_key);
+
+ // GetId cycles from 1 to port::kMaxUint64.
+ uint64_t NextGetId();
+
+ private:
+ TraceOptions trace_options_;
+ // A mutex protects the writer_.
+ InstrumentedMutex trace_writer_mutex_;
+ std::atomic<BlockCacheTraceWriter*> writer_;
+ std::atomic<uint64_t> get_id_counter_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/block_cache_tracer_test.cc b/src/rocksdb/trace_replay/block_cache_tracer_test.cc
new file mode 100644
index 000000000..b29600890
--- /dev/null
+++ b/src/rocksdb/trace_replay/block_cache_tracer_test.cc
@@ -0,0 +1,378 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/block_cache_tracer.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+const uint64_t kBlockSize = 1024;
+const std::string kBlockKeyPrefix = "test-block-";
+const uint32_t kCFId = 0;
+const uint32_t kLevel = 1;
+const uint64_t kSSTFDNumber = 100;
+const std::string kRefKeyPrefix = "test-get-";
+const uint64_t kNumKeysInBlock = 1024;
+const uint64_t kReferencedDataSize = 10;
+} // namespace
+
+class BlockCacheTracerTest : public testing::Test {
+ public:
+ BlockCacheTracerTest() {
+ test_path_ = test::PerThreadDBPath("block_cache_tracer_test");
+ env_ = ROCKSDB_NAMESPACE::Env::Default();
+ EXPECT_OK(env_->CreateDir(test_path_));
+ trace_file_path_ = test_path_ + "/block_cache_trace";
+ }
+
+ ~BlockCacheTracerTest() override {
+ EXPECT_OK(env_->DeleteFile(trace_file_path_));
+ EXPECT_OK(env_->DeleteDir(test_path_));
+ }
+
+ TableReaderCaller GetCaller(uint32_t key_id) {
+ uint32_t n = key_id % 5;
+ switch (n) {
+ case 0:
+ return TableReaderCaller::kPrefetch;
+ case 1:
+ return TableReaderCaller::kCompaction;
+ case 2:
+ return TableReaderCaller::kUserGet;
+ case 3:
+ return TableReaderCaller::kUserMultiGet;
+ case 4:
+ return TableReaderCaller::kUserIterator;
+ }
+ assert(false);
+ }
+
+ void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id,
+ TraceType block_type, uint32_t nblocks) {
+ assert(writer);
+ for (uint32_t i = 0; i < nblocks; i++) {
+ uint32_t key_id = from_key_id + i;
+ BlockCacheTraceRecord record;
+ record.block_type = block_type;
+ record.block_size = kBlockSize + key_id;
+ record.block_key = (kBlockKeyPrefix + std::to_string(key_id));
+ record.access_timestamp = env_->NowMicros();
+ record.cf_id = kCFId;
+ record.cf_name = kDefaultColumnFamilyName;
+ record.caller = GetCaller(key_id);
+ record.level = kLevel;
+ record.sst_fd_number = kSSTFDNumber + key_id;
+ record.is_cache_hit = Boolean::kFalse;
+ record.no_insert = Boolean::kFalse;
+ // Provide get_id for all callers. The writer should only write get_id
+ // when the caller is either GET or MGET.
+ record.get_id = key_id + 1;
+ record.get_from_user_specified_snapshot = Boolean::kTrue;
+ // Provide these fields for all block types.
+ // The writer should only write these fields for data blocks and the
+ // caller is either GET or MGET.
+ record.referenced_key = (kRefKeyPrefix + std::to_string(key_id));
+ record.referenced_key_exist_in_block = Boolean::kTrue;
+ record.num_keys_in_block = kNumKeysInBlock;
+ record.referenced_data_size = kReferencedDataSize + key_id;
+ ASSERT_OK(writer->WriteBlockAccess(
+ record, record.block_key, record.cf_name, record.referenced_key));
+ }
+ }
+
+ BlockCacheTraceRecord GenerateAccessRecord() {
+ uint32_t key_id = 0;
+ BlockCacheTraceRecord record;
+ record.block_type = TraceType::kBlockTraceDataBlock;
+ record.block_size = kBlockSize;
+ record.block_key = kBlockKeyPrefix + std::to_string(key_id);
+ record.access_timestamp = env_->NowMicros();
+ record.cf_id = kCFId;
+ record.cf_name = kDefaultColumnFamilyName;
+ record.caller = GetCaller(key_id);
+ record.level = kLevel;
+ record.sst_fd_number = kSSTFDNumber + key_id;
+ record.is_cache_hit = Boolean::kFalse;
+ record.no_insert = Boolean::kFalse;
+ record.referenced_key = kRefKeyPrefix + std::to_string(key_id);
+ record.referenced_key_exist_in_block = Boolean::kTrue;
+ record.num_keys_in_block = kNumKeysInBlock;
+ return record;
+ }
+
+ void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id,
+ TraceType block_type, uint32_t nblocks) {
+ assert(reader);
+ for (uint32_t i = 0; i < nblocks; i++) {
+ uint32_t key_id = from_key_id + i;
+ BlockCacheTraceRecord record;
+ ASSERT_OK(reader->ReadAccess(&record));
+ ASSERT_EQ(block_type, record.block_type);
+ ASSERT_EQ(kBlockSize + key_id, record.block_size);
+ ASSERT_EQ(kBlockKeyPrefix + std::to_string(key_id), record.block_key);
+ ASSERT_EQ(kCFId, record.cf_id);
+ ASSERT_EQ(kDefaultColumnFamilyName, record.cf_name);
+ ASSERT_EQ(GetCaller(key_id), record.caller);
+ ASSERT_EQ(kLevel, record.level);
+ ASSERT_EQ(kSSTFDNumber + key_id, record.sst_fd_number);
+ ASSERT_EQ(Boolean::kFalse, record.is_cache_hit);
+ ASSERT_EQ(Boolean::kFalse, record.no_insert);
+ if (record.caller == TableReaderCaller::kUserGet ||
+ record.caller == TableReaderCaller::kUserMultiGet) {
+ ASSERT_EQ(key_id + 1, record.get_id);
+ ASSERT_EQ(Boolean::kTrue, record.get_from_user_specified_snapshot);
+ ASSERT_EQ(kRefKeyPrefix + std::to_string(key_id),
+ record.referenced_key);
+ } else {
+ ASSERT_EQ(BlockCacheTraceHelper::kReservedGetId, record.get_id);
+ ASSERT_EQ(Boolean::kFalse, record.get_from_user_specified_snapshot);
+ ASSERT_EQ("", record.referenced_key);
+ }
+ if (block_type == TraceType::kBlockTraceDataBlock &&
+ (record.caller == TableReaderCaller::kUserGet ||
+ record.caller == TableReaderCaller::kUserMultiGet)) {
+ ASSERT_EQ(Boolean::kTrue, record.referenced_key_exist_in_block);
+ ASSERT_EQ(kNumKeysInBlock, record.num_keys_in_block);
+ ASSERT_EQ(kReferencedDataSize + key_id, record.referenced_data_size);
+ continue;
+ }
+ ASSERT_EQ(Boolean::kFalse, record.referenced_key_exist_in_block);
+ ASSERT_EQ(0, record.num_keys_in_block);
+ ASSERT_EQ(0, record.referenced_data_size);
+ }
+ }
+
+ Env* env_;
+ EnvOptions env_options_;
+ std::string trace_file_path_;
+ std::string test_path_;
+};
+
+TEST_F(BlockCacheTracerTest, AtomicWriteBeforeStartTrace) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ {
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ BlockCacheTracer writer;
+ // The record should be written to the trace_file since StartTrace is not
+ // called.
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains nothing.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_NOK(reader.ReadHeader(&header));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, AtomicWrite) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ {
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ BlockCacheTracer writer;
+ ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains one record.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
+ ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
+ VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1);
+ ASSERT_NOK(reader.ReadAccess(&record));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, ConsecutiveStartTrace) {
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(
+ NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer));
+ BlockCacheTracer writer;
+ ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_NOK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+}
+
+TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ {
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ BlockCacheTracer writer;
+ ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ writer.EndTrace();
+ // Write the record again. This time the record should not be written since
+ // EndTrace is called.
+ ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
+ record.referenced_key));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Verify trace file contains one record.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
+ ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
+ VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1);
+ ASSERT_NOK(reader.ReadAccess(&record));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, NextGetId) {
+ BlockCacheTracer writer;
+ {
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ // next get id should always return 0 before we call StartTrace.
+ ASSERT_EQ(0, writer.NextGetId());
+ ASSERT_EQ(0, writer.NextGetId());
+ ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_EQ(1, writer.NextGetId());
+ ASSERT_EQ(2, writer.NextGetId());
+ writer.EndTrace();
+ // next get id should return 0.
+ ASSERT_EQ(0, writer.NextGetId());
+ }
+
+ // Start trace again and next get id should return 1.
+ {
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_EQ(1, writer.NextGetId());
+ }
+}
+
+TEST_F(BlockCacheTracerTest, MixedBlocks) {
+ {
+ // Generate a trace file containing a mix of blocks.
+ TraceOptions trace_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
+ ASSERT_OK(writer.WriteHeader());
+ // Write blocks of different types.
+ WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock,
+ 10);
+ WriteBlockAccess(&writer, 10, TraceType::kBlockTraceDataBlock, 10);
+ WriteBlockAccess(&writer, 20, TraceType::kBlockTraceFilterBlock, 10);
+ WriteBlockAccess(&writer, 30, TraceType::kBlockTraceIndexBlock, 10);
+ WriteBlockAccess(&writer, 40, TraceType::kBlockTraceRangeDeletionBlock, 10);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+
+ {
+ // Verify trace file is generated correctly.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
+ ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
+ // Read blocks.
+ VerifyAccess(&reader, 0, TraceType::kBlockTraceUncompressionDictBlock, 10);
+ VerifyAccess(&reader, 10, TraceType::kBlockTraceDataBlock, 10);
+ VerifyAccess(&reader, 20, TraceType::kBlockTraceFilterBlock, 10);
+ VerifyAccess(&reader, 30, TraceType::kBlockTraceIndexBlock, 10);
+ VerifyAccess(&reader, 40, TraceType::kBlockTraceRangeDeletionBlock, 10);
+ // Read one more record should report an error.
+ BlockCacheTraceRecord record;
+ ASSERT_NOK(reader.ReadAccess(&record));
+ }
+}
+
+TEST_F(BlockCacheTracerTest, HumanReadableTrace) {
+ BlockCacheTraceRecord record = GenerateAccessRecord();
+ record.get_id = 1;
+ record.referenced_key = "";
+ record.caller = TableReaderCaller::kUserGet;
+ record.get_from_user_specified_snapshot = Boolean::kTrue;
+ record.referenced_data_size = kReferencedDataSize;
+ PutFixed32(&record.referenced_key, 111);
+ PutLengthPrefixedSlice(&record.referenced_key, "get_key");
+ PutFixed64(&record.referenced_key, 2 << 8);
+ PutLengthPrefixedSlice(&record.block_key, "block_key");
+ PutVarint64(&record.block_key, 333);
+ {
+ // Generate a human readable trace file.
+ BlockCacheHumanReadableTraceWriter writer;
+ ASSERT_OK(writer.NewWritableFile(trace_file_path_, env_));
+ ASSERT_OK(writer.WriteHumanReadableTraceRecord(record, 1, 1));
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ BlockCacheHumanReadableTraceReader reader(trace_file_path_);
+ BlockCacheTraceHeader header;
+ BlockCacheTraceRecord read_record;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_OK(reader.ReadAccess(&read_record));
+ ASSERT_EQ(TraceType::kBlockTraceDataBlock, read_record.block_type);
+ ASSERT_EQ(kBlockSize, read_record.block_size);
+ ASSERT_EQ(kCFId, read_record.cf_id);
+ ASSERT_EQ(kDefaultColumnFamilyName, read_record.cf_name);
+ ASSERT_EQ(TableReaderCaller::kUserGet, read_record.caller);
+ ASSERT_EQ(kLevel, read_record.level);
+ ASSERT_EQ(kSSTFDNumber, read_record.sst_fd_number);
+ ASSERT_EQ(Boolean::kFalse, read_record.is_cache_hit);
+ ASSERT_EQ(Boolean::kFalse, read_record.no_insert);
+ ASSERT_EQ(1, read_record.get_id);
+ ASSERT_EQ(Boolean::kTrue, read_record.get_from_user_specified_snapshot);
+ ASSERT_EQ(Boolean::kTrue, read_record.referenced_key_exist_in_block);
+ ASSERT_EQ(kNumKeysInBlock, read_record.num_keys_in_block);
+ ASSERT_EQ(kReferencedDataSize, read_record.referenced_data_size);
+ ASSERT_EQ(record.block_key.size(), read_record.block_key.size());
+ ASSERT_EQ(record.referenced_key.size(), record.referenced_key.size());
+ ASSERT_EQ(112, BlockCacheTraceHelper::GetTableId(read_record));
+ ASSERT_EQ(3, BlockCacheTraceHelper::GetSequenceNumber(read_record));
+ ASSERT_EQ(333, BlockCacheTraceHelper::GetBlockOffsetInFile(read_record));
+ // Read again should fail.
+ ASSERT_NOK(reader.ReadAccess(&read_record));
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/trace_replay/trace_replay.cc b/src/rocksdb/trace_replay/trace_replay.cc
new file mode 100644
index 000000000..a0f9a504f
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_replay.cc
@@ -0,0 +1,485 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "trace_replay/trace_replay.h"
+
+#include <chrono>
+#include <sstream>
+#include <thread>
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/write_batch.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+#include "util/threadpool_imp.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+const std::string kTraceMagic = "feedcafedeadbeef";
+
+namespace {
+void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
+ PutFixed32(dst, cf_id);
+ PutLengthPrefixedSlice(dst, key);
+}
+
+void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
+ Slice buf(buffer);
+ GetFixed32(&buf, cf_id);
+ GetLengthPrefixedSlice(&buf, key);
+}
+} // namespace
+
+void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
+ assert(encoded_trace);
+ PutFixed64(encoded_trace, trace.ts);
+ encoded_trace->push_back(trace.type);
+ PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
+ encoded_trace->append(trace.payload);
+}
+
+Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
+ Trace* trace) {
+ assert(trace != nullptr);
+ Slice enc_slice = Slice(encoded_trace);
+ if (!GetFixed64(&enc_slice, &trace->ts)) {
+ return Status::Incomplete("Decode trace string failed");
+ }
+ if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
+ return Status::Incomplete("Decode trace string failed");
+ }
+ trace->type = static_cast<TraceType>(enc_slice[0]);
+ enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
+ trace->payload = enc_slice.ToString();
+ return Status::OK();
+}
+
+Tracer::Tracer(Env* env, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer)
+ : env_(env),
+ trace_options_(trace_options),
+ trace_writer_(std::move(trace_writer)),
+ trace_request_count_ (0) {
+ WriteHeader();
+}
+
+Tracer::~Tracer() { trace_writer_.reset(); }
+
+Status Tracer::Write(WriteBatch* write_batch) {
+ TraceType trace_type = kTraceWrite;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = env_->NowMicros();
+ trace.type = trace_type;
+ trace.payload = write_batch->Data();
+ return WriteTrace(trace);
+}
+
+Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
+ TraceType trace_type = kTraceGet;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = env_->NowMicros();
+ trace.type = trace_type;
+ EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
+ return WriteTrace(trace);
+}
+
+Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
+ TraceType trace_type = kTraceIteratorSeek;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = env_->NowMicros();
+ trace.type = trace_type;
+ EncodeCFAndKey(&trace.payload, cf_id, key);
+ return WriteTrace(trace);
+}
+
+Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
+ TraceType trace_type = kTraceIteratorSeekForPrev;
+ if (ShouldSkipTrace(trace_type)) {
+ return Status::OK();
+ }
+ Trace trace;
+ trace.ts = env_->NowMicros();
+ trace.type = trace_type;
+ EncodeCFAndKey(&trace.payload, cf_id, key);
+ return WriteTrace(trace);
+}
+
+bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
+ if (IsTraceFileOverMax()) {
+ return true;
+ }
+ if ((trace_options_.filter & kTraceFilterGet
+ && trace_type == kTraceGet)
+ || (trace_options_.filter & kTraceFilterWrite
+ && trace_type == kTraceWrite)) {
+ return true;
+ }
+ ++trace_request_count_;
+ if (trace_request_count_ < trace_options_.sampling_frequency) {
+ return true;
+ }
+ trace_request_count_ = 0;
+ return false;
+}
+
+bool Tracer::IsTraceFileOverMax() {
+ uint64_t trace_file_size = trace_writer_->GetFileSize();
+ return (trace_file_size > trace_options_.max_trace_file_size);
+}
+
+Status Tracer::WriteHeader() {
+ std::ostringstream s;
+ s << kTraceMagic << "\t"
+ << "Trace Version: 0.1\t"
+ << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
+ << "Format: Timestamp OpType Payload\n";
+ std::string header(s.str());
+
+ Trace trace;
+ trace.ts = env_->NowMicros();
+ trace.type = kTraceBegin;
+ trace.payload = header;
+ return WriteTrace(trace);
+}
+
+Status Tracer::WriteFooter() {
+ Trace trace;
+ trace.ts = env_->NowMicros();
+ trace.type = kTraceEnd;
+ trace.payload = "";
+ return WriteTrace(trace);
+}
+
+Status Tracer::WriteTrace(const Trace& trace) {
+ std::string encoded_trace;
+ TracerHelper::EncodeTrace(trace, &encoded_trace);
+ return trace_writer_->Write(Slice(encoded_trace));
+}
+
+Status Tracer::Close() { return WriteFooter(); }
+
+Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader)
+ : trace_reader_(std::move(reader)) {
+ assert(db != nullptr);
+ db_ = static_cast<DBImpl*>(db->GetRootDB());
+ env_ = Env::Default();
+ for (ColumnFamilyHandle* cfh : handles) {
+ cf_map_[cfh->GetID()] = cfh;
+ }
+ fast_forward_ = 1;
+}
+
+Replayer::~Replayer() { trace_reader_.reset(); }
+
+Status Replayer::SetFastForward(uint32_t fast_forward) {
+ Status s;
+ if (fast_forward < 1) {
+ s = Status::InvalidArgument("Wrong fast forward speed!");
+ } else {
+ fast_forward_ = fast_forward;
+ s = Status::OK();
+ }
+ return s;
+}
+
+Status Replayer::Replay() {
+ Status s;
+ Trace header;
+ s = ReadHeader(&header);
+ if (!s.ok()) {
+ return s;
+ }
+
+ std::chrono::system_clock::time_point replay_epoch =
+ std::chrono::system_clock::now();
+ WriteOptions woptions;
+ ReadOptions roptions;
+ Trace trace;
+ uint64_t ops = 0;
+ Iterator* single_iter = nullptr;
+ while (s.ok()) {
+ trace.reset();
+ s = ReadTrace(&trace);
+ if (!s.ok()) {
+ break;
+ }
+
+ std::this_thread::sleep_until(
+ replay_epoch +
+ std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
+ if (trace.type == kTraceWrite) {
+ WriteBatch batch(trace.payload);
+ db_->Write(woptions, &batch);
+ ops++;
+ } else if (trace.type == kTraceGet) {
+ uint32_t cf_id = 0;
+ Slice key;
+ DecodeCFAndKey(trace.payload, &cf_id, &key);
+ if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+
+ std::string value;
+ if (cf_id == 0) {
+ db_->Get(roptions, key, &value);
+ } else {
+ db_->Get(roptions, cf_map_[cf_id], key, &value);
+ }
+ ops++;
+ } else if (trace.type == kTraceIteratorSeek) {
+ uint32_t cf_id = 0;
+ Slice key;
+ DecodeCFAndKey(trace.payload, &cf_id, &key);
+ if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+
+ if (cf_id == 0) {
+ single_iter = db_->NewIterator(roptions);
+ } else {
+ single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
+ }
+ single_iter->Seek(key);
+ ops++;
+ delete single_iter;
+ } else if (trace.type == kTraceIteratorSeekForPrev) {
+ // Currently, only support to call the Seek()
+ uint32_t cf_id = 0;
+ Slice key;
+ DecodeCFAndKey(trace.payload, &cf_id, &key);
+ if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
+ return Status::Corruption("Invalid Column Family ID.");
+ }
+
+ if (cf_id == 0) {
+ single_iter = db_->NewIterator(roptions);
+ } else {
+ single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
+ }
+ single_iter->SeekForPrev(key);
+ ops++;
+ delete single_iter;
+ } else if (trace.type == kTraceEnd) {
+ // Do nothing for now.
+ // TODO: Add some validations later.
+ break;
+ }
+ }
+
+ if (s.IsIncomplete()) {
+ // Reaching eof returns Incomplete status at the moment.
+ // Could happen when killing a process without calling EndTrace() API.
+ // TODO: Add better error handling.
+ return Status::OK();
+ }
+ return s;
+}
+
+// The trace can be replayed with multithread by configurnge the number of
+// threads in the thread pool. Trace records are read from the trace file
+// sequentially and the corresponding queries are scheduled in the task
+// queue based on the timestamp. Currently, we support Write_batch (Put,
+// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
+Status Replayer::MultiThreadReplay(uint32_t threads_num) {
+ Status s;
+ Trace header;
+ s = ReadHeader(&header);
+ if (!s.ok()) {
+ return s;
+ }
+
+ ThreadPoolImpl thread_pool;
+ thread_pool.SetHostEnv(env_);
+
+ if (threads_num > 1) {
+ thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
+ } else {
+ thread_pool.SetBackgroundThreads(1);
+ }
+
+ std::chrono::system_clock::time_point replay_epoch =
+ std::chrono::system_clock::now();
+ WriteOptions woptions;
+ ReadOptions roptions;
+ uint64_t ops = 0;
+ while (s.ok()) {
+ std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
+ ra->db = db_;
+ s = ReadTrace(&(ra->trace_entry));
+ if (!s.ok()) {
+ break;
+ }
+ ra->woptions = woptions;
+ ra->roptions = roptions;
+
+ std::this_thread::sleep_until(
+ replay_epoch + std::chrono::microseconds(
+ (ra->trace_entry.ts - header.ts) / fast_forward_));
+ if (ra->trace_entry.type == kTraceWrite) {
+ thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
+ nullptr);
+ ops++;
+ } else if (ra->trace_entry.type == kTraceGet) {
+ thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
+ nullptr);
+ ops++;
+ } else if (ra->trace_entry.type == kTraceIteratorSeek) {
+ thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
+ nullptr);
+ ops++;
+ } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
+ thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
+ nullptr, nullptr);
+ ops++;
+ } else if (ra->trace_entry.type == kTraceEnd) {
+ // Do nothing for now.
+ // TODO: Add some validations later.
+ break;
+ } else {
+ // Other trace entry types that are not implemented for replay.
+ // To finish the replay, we continue the process.
+ continue;
+ }
+ }
+
+ if (s.IsIncomplete()) {
+ // Reaching eof returns Incomplete status at the moment.
+ // Could happen when killing a process without calling EndTrace() API.
+ // TODO: Add better error handling.
+ s = Status::OK();
+ }
+ thread_pool.JoinAllThreads();
+ return s;
+}
+
+Status Replayer::ReadHeader(Trace* header) {
+ assert(header != nullptr);
+ Status s = ReadTrace(header);
+ if (!s.ok()) {
+ return s;
+ }
+ if (header->type != kTraceBegin) {
+ return Status::Corruption("Corrupted trace file. Incorrect header.");
+ }
+ if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
+ return Status::Corruption("Corrupted trace file. Incorrect magic.");
+ }
+
+ return s;
+}
+
+Status Replayer::ReadFooter(Trace* footer) {
+ assert(footer != nullptr);
+ Status s = ReadTrace(footer);
+ if (!s.ok()) {
+ return s;
+ }
+ if (footer->type != kTraceEnd) {
+ return Status::Corruption("Corrupted trace file. Incorrect footer.");
+ }
+
+ // TODO: Add more validations later
+ return s;
+}
+
+Status Replayer::ReadTrace(Trace* trace) {
+ assert(trace != nullptr);
+ std::string encoded_trace;
+ Status s = trace_reader_->Read(&encoded_trace);
+ if (!s.ok()) {
+ return s;
+ }
+ return TracerHelper::DecodeTrace(encoded_trace, trace);
+}
+
+void Replayer::BGWorkGet(void* arg) {
+ std::unique_ptr<ReplayerWorkerArg> ra(
+ reinterpret_cast<ReplayerWorkerArg*>(arg));
+ auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
+ ra->cf_map);
+ uint32_t cf_id = 0;
+ Slice key;
+ DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
+ if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
+ return;
+ }
+
+ std::string value;
+ if (cf_id == 0) {
+ ra->db->Get(ra->roptions, key, &value);
+ } else {
+ ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
+ }
+
+ return;
+}
+
+void Replayer::BGWorkWriteBatch(void* arg) {
+ std::unique_ptr<ReplayerWorkerArg> ra(
+ reinterpret_cast<ReplayerWorkerArg*>(arg));
+ WriteBatch batch(ra->trace_entry.payload);
+ ra->db->Write(ra->woptions, &batch);
+ return;
+}
+
+void Replayer::BGWorkIterSeek(void* arg) {
+ std::unique_ptr<ReplayerWorkerArg> ra(
+ reinterpret_cast<ReplayerWorkerArg*>(arg));
+ auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
+ ra->cf_map);
+ uint32_t cf_id = 0;
+ Slice key;
+ DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
+ if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
+ return;
+ }
+
+ std::string value;
+ Iterator* single_iter = nullptr;
+ if (cf_id == 0) {
+ single_iter = ra->db->NewIterator(ra->roptions);
+ } else {
+ single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
+ }
+ single_iter->Seek(key);
+ delete single_iter;
+ return;
+}
+
+void Replayer::BGWorkIterSeekForPrev(void* arg) {
+ std::unique_ptr<ReplayerWorkerArg> ra(
+ reinterpret_cast<ReplayerWorkerArg*>(arg));
+ auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
+ ra->cf_map);
+ uint32_t cf_id = 0;
+ Slice key;
+ DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
+ if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
+ return;
+ }
+
+ std::string value;
+ Iterator* single_iter = nullptr;
+ if (cf_id == 0) {
+ single_iter = ra->db->NewIterator(ra->roptions);
+ } else {
+ single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
+ }
+ single_iter->SeekForPrev(key);
+ delete single_iter;
+ return;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/trace_replay/trace_replay.h b/src/rocksdb/trace_replay/trace_replay.h
new file mode 100644
index 000000000..e7ef598f0
--- /dev/null
+++ b/src/rocksdb/trace_replay/trace_replay.h
@@ -0,0 +1,189 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/trace_reader_writer.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// This file contains Tracer and Replayer classes that enable capturing and
+// replaying RocksDB traces.
+
+class ColumnFamilyHandle;
+class ColumnFamilyData;
+class DB;
+class DBImpl;
+class Slice;
+class WriteBatch;
+
+extern const std::string kTraceMagic;
+const unsigned int kTraceTimestampSize = 8;
+const unsigned int kTraceTypeSize = 1;
+const unsigned int kTracePayloadLengthSize = 4;
+const unsigned int kTraceMetadataSize =
+ kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize;
+
+// Supported Trace types.
+enum TraceType : char {
+ kTraceBegin = 1,
+ kTraceEnd = 2,
+ kTraceWrite = 3,
+ kTraceGet = 4,
+ kTraceIteratorSeek = 5,
+ kTraceIteratorSeekForPrev = 6,
+ // Block cache related types.
+ kBlockTraceIndexBlock = 7,
+ kBlockTraceFilterBlock = 8,
+ kBlockTraceDataBlock = 9,
+ kBlockTraceUncompressionDictBlock = 10,
+ kBlockTraceRangeDeletionBlock = 11,
+ // All trace types should be added before kTraceMax
+ kTraceMax,
+};
+
+// TODO: This should also be made part of public interface to help users build
+// custom TracerReaders and TraceWriters.
+//
+// The data structure that defines a single trace.
+struct Trace {
+ uint64_t ts; // timestamp
+ TraceType type;
+ std::string payload;
+
+ void reset() {
+ ts = 0;
+ type = kTraceMax;
+ payload.clear();
+ }
+};
+
+class TracerHelper {
+ public:
+ // Encode a trace object into the given string.
+ static void EncodeTrace(const Trace& trace, std::string* encoded_trace);
+
+ // Decode a string into the given trace object.
+ static Status DecodeTrace(const std::string& encoded_trace, Trace* trace);
+};
+
+// Tracer captures all RocksDB operations using a user-provided TraceWriter.
+// Every RocksDB operation is written as a single trace. Each trace will have a
+// timestamp and type, followed by the trace payload.
+class Tracer {
+ public:
+ Tracer(Env* env, const TraceOptions& trace_options,
+ std::unique_ptr<TraceWriter>&& trace_writer);
+ ~Tracer();
+
+ // Trace all write operations -- Put, Merge, Delete, SingleDelete, Write
+ Status Write(WriteBatch* write_batch);
+
+ // Trace Get operations.
+ Status Get(ColumnFamilyHandle* cfname, const Slice& key);
+
+ // Trace Iterators.
+ Status IteratorSeek(const uint32_t& cf_id, const Slice& key);
+ Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
+
+ // Returns true if the trace is over the configured max trace file limit.
+ // False otherwise.
+ bool IsTraceFileOverMax();
+
+ // Writes a trace footer at the end of the tracing
+ Status Close();
+
+ private:
+ // Write a trace header at the beginning, typically on initiating a trace,
+ // with some metadata like a magic number, trace version, RocksDB version, and
+ // trace format.
+ Status WriteHeader();
+
+ // Write a trace footer, typically on ending a trace, with some metadata.
+ Status WriteFooter();
+
+ // Write a single trace using the provided TraceWriter to the underlying
+ // system, say, a filesystem or a streaming service.
+ Status WriteTrace(const Trace& trace);
+
+ // Helps in filtering and sampling of traces.
+ // Returns true if a trace should be skipped, false otherwise.
+ bool ShouldSkipTrace(const TraceType& type);
+
+ Env* env_;
+ TraceOptions trace_options_;
+ std::unique_ptr<TraceWriter> trace_writer_;
+ uint64_t trace_request_count_;
+};
+
+// Replayer helps to replay the captured RocksDB operations, using a user
+// provided TraceReader.
+// The Replayer is instantiated via db_bench today, on using "replay" benchmark.
+class Replayer {
+ public:
+ Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
+ std::unique_ptr<TraceReader>&& reader);
+ ~Replayer();
+
+ // Replay all the traces from the provided trace stream, taking the delay
+ // between the traces into consideration.
+ Status Replay();
+
+ // Replay the provide trace stream, which is the same as Replay(), with
+ // multi-threads. Queries are scheduled in the thread pool job queue.
+ // User can set the number of threads in the thread pool.
+ Status MultiThreadReplay(uint32_t threads_num);
+
+ // Enables fast forwarding a replay by reducing the delay between the ingested
+ // traces.
+ // fast_forward : Rate of replay speedup.
+ // If 1, replay the operations at the same rate as in the trace stream.
+ // If > 1, speed up the replay by this amount.
+ Status SetFastForward(uint32_t fast_forward);
+
+ private:
+ Status ReadHeader(Trace* header);
+ Status ReadFooter(Trace* footer);
+ Status ReadTrace(Trace* trace);
+
+ // The background function for MultiThreadReplay to execute Get query
+ // based on the trace records.
+ static void BGWorkGet(void* arg);
+
+ // The background function for MultiThreadReplay to execute WriteBatch
+ // (Put, Delete, SingleDelete, DeleteRange) based on the trace records.
+ static void BGWorkWriteBatch(void* arg);
+
+ // The background function for MultiThreadReplay to execute Iterator (Seek)
+ // based on the trace records.
+ static void BGWorkIterSeek(void* arg);
+
+ // The background function for MultiThreadReplay to execute Iterator
+ // (SeekForPrev) based on the trace records.
+ static void BGWorkIterSeekForPrev(void* arg);
+
+ DBImpl* db_;
+ Env* env_;
+ std::unique_ptr<TraceReader> trace_reader_;
+ std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
+ uint32_t fast_forward_;
+};
+
+// The passin arg of MultiThreadRepkay for each trace record.
+struct ReplayerWorkerArg {
+ DB* db;
+ Trace trace_entry;
+ std::unordered_map<uint32_t, ColumnFamilyHandle*>* cf_map;
+ WriteOptions woptions;
+ ReadOptions roptions;
+};
+
+} // namespace ROCKSDB_NAMESPACE