diff options
Diffstat (limited to 'src/rocksdb/trace_replay')
-rw-r--r-- | src/rocksdb/trace_replay/block_cache_tracer.cc | 497 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/block_cache_tracer.h | 294 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/block_cache_tracer_test.cc | 378 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_replay.cc | 485 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_replay.h | 189 |
5 files changed, 1843 insertions, 0 deletions
diff --git a/src/rocksdb/trace_replay/block_cache_tracer.cc b/src/rocksdb/trace_replay/block_cache_tracer.cc new file mode 100644 index 000000000..9a96f1bac --- /dev/null +++ b/src/rocksdb/trace_replay/block_cache_tracer.cc @@ -0,0 +1,497 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/block_cache_tracer.h" + +#include <cinttypes> +#include <cstdio> +#include <cstdlib> + +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const unsigned int kCharSize = 1; + +bool ShouldTrace(const Slice& block_key, const TraceOptions& trace_options) { + if (trace_options.sampling_frequency == 0 || + trace_options.sampling_frequency == 1) { + return true; + } + // We use spatial downsampling so that we have a complete access history for a + // block. + return 0 == fastrange64(GetSliceNPHash64(block_key), + trace_options.sampling_frequency); +} +} // namespace + +const uint64_t kMicrosInSecond = 1000 * 1000; +const uint64_t kSecondInMinute = 60; +const uint64_t kSecondInHour = 3600; +const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName = + "UnknownColumnFamily"; +const uint64_t BlockCacheTraceHelper::kReservedGetId = 0; + +bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock( + TraceType block_type, TableReaderCaller caller) { + return (block_type == TraceType::kBlockTraceDataBlock) && + IsGetOrMultiGet(caller); +} + +bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) { + return caller == TableReaderCaller::kUserGet || + caller == TableReaderCaller::kUserMultiGet; +} + +bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) { + return caller == TableReaderCaller::kUserGet || + caller == TableReaderCaller::kUserMultiGet || + caller == TableReaderCaller::kUserIterator || + caller == TableReaderCaller::kUserApproximateSize || + caller == TableReaderCaller::kUserVerifyChecksum; +} + +std::string BlockCacheTraceHelper::ComputeRowKey( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller)) { + return ""; + } + Slice key = ExtractUserKey(access.referenced_key); + return std::to_string(access.sst_fd_number) + "_" + key.ToString(); +} + +uint64_t BlockCacheTraceHelper::GetTableId( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) { + return 0; + } + return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1; +} + +uint64_t BlockCacheTraceHelper::GetSequenceNumber( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller)) { + return 0; + } + return access.get_from_user_specified_snapshot == Boolean::kFalse + ? 0 + : 1 + GetInternalKeySeqno(access.referenced_key); +} + +uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile( + const BlockCacheTraceRecord& access) { + Slice input(access.block_key); + uint64_t offset = 0; + while (true) { + uint64_t tmp = 0; + if (GetVarint64(&input, &tmp)) { + offset = tmp; + } else { + break; + } + } + return offset; +} + +BlockCacheTraceWriter::BlockCacheTraceWriter( + Env* env, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) + : env_(env), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)) {} + +Status BlockCacheTraceWriter::WriteBlockAccess( + const BlockCacheTraceRecord& record, const Slice& block_key, + const Slice& cf_name, const Slice& referenced_key) { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + if (trace_file_size > trace_options_.max_trace_file_size) { + return Status::OK(); + } + Trace trace; + trace.ts = record.access_timestamp; + trace.type = record.block_type; + PutLengthPrefixedSlice(&trace.payload, block_key); + PutFixed64(&trace.payload, record.block_size); + PutFixed64(&trace.payload, record.cf_id); + PutLengthPrefixedSlice(&trace.payload, cf_name); + PutFixed32(&trace.payload, record.level); + PutFixed64(&trace.payload, record.sst_fd_number); + trace.payload.push_back(record.caller); + trace.payload.push_back(record.is_cache_hit); + trace.payload.push_back(record.no_insert); + if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) { + PutFixed64(&trace.payload, record.get_id); + trace.payload.push_back(record.get_from_user_specified_snapshot); + PutLengthPrefixedSlice(&trace.payload, referenced_key); + } + if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type, + record.caller)) { + PutFixed64(&trace.payload, record.referenced_data_size); + PutFixed64(&trace.payload, record.num_keys_in_block); + trace.payload.push_back(record.referenced_key_exist_in_block); + } + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +Status BlockCacheTraceWriter::WriteHeader() { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = TraceType::kTraceBegin; + PutLengthPrefixedSlice(&trace.payload, kTraceMagic); + PutFixed32(&trace.payload, kMajorVersion); + PutFixed32(&trace.payload, kMinorVersion); + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +BlockCacheTraceReader::BlockCacheTraceReader( + std::unique_ptr<TraceReader>&& reader) + : trace_reader_(std::move(reader)) {} + +Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) { + assert(header != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + header->start_time = trace.ts; + Slice enc_slice = Slice(trace.payload); + Slice magnic_number; + if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read the magic number."); + } + if (magnic_number.ToString() != kTraceMagic) { + return Status::Corruption( + "Corrupted header in the trace file: Magic number does not match."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb major " + "version number."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb minor " + "version number."); + } + // We should have retrieved all information in the header. + if (!enc_slice.empty()) { + return Status::Corruption( + "Corrupted header in the trace file: The length of header is too " + "long."); + } + return Status::OK(); +} + +Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) { + assert(record); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + record->access_timestamp = trace.ts; + record->block_type = trace.type; + Slice enc_slice = Slice(trace.payload); + + Slice block_key; + if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) { + return Status::Incomplete( + "Incomplete access record: Failed to read block key."); + } + record->block_key = block_key.ToString(); + if (!GetFixed64(&enc_slice, &record->block_size)) { + return Status::Incomplete( + "Incomplete access record: Failed to read block size."); + } + if (!GetFixed64(&enc_slice, &record->cf_id)) { + return Status::Incomplete( + "Incomplete access record: Failed to read column family ID."); + } + Slice cf_name; + if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) { + return Status::Incomplete( + "Incomplete access record: Failed to read column family name."); + } + record->cf_name = cf_name.ToString(); + if (!GetFixed32(&enc_slice, &record->level)) { + return Status::Incomplete( + "Incomplete access record: Failed to read level."); + } + if (!GetFixed64(&enc_slice, &record->sst_fd_number)) { + return Status::Incomplete( + "Incomplete access record: Failed to read SST file number."); + } + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read caller."); + } + record->caller = static_cast<TableReaderCaller>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read is_cache_hit."); + } + record->is_cache_hit = static_cast<Boolean>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read no_insert."); + } + record->no_insert = static_cast<Boolean>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) { + if (!GetFixed64(&enc_slice, &record->get_id)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the get id."); + } + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read " + "get_from_user_specified_snapshot."); + } + record->get_from_user_specified_snapshot = + static_cast<Boolean>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + Slice referenced_key; + if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the referenced key."); + } + record->referenced_key = referenced_key.ToString(); + } + if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type, + record->caller)) { + if (!GetFixed64(&enc_slice, &record->referenced_data_size)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the referenced data size."); + } + if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the number of keys in the " + "block."); + } + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read " + "referenced_key_exist_in_block."); + } + record->referenced_key_exist_in_block = static_cast<Boolean>(enc_slice[0]); + } + return Status::OK(); +} + +BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() { + if (human_readable_trace_file_writer_) { + human_readable_trace_file_writer_->Flush(); + human_readable_trace_file_writer_->Close(); + } +} + +Status BlockCacheHumanReadableTraceWriter::NewWritableFile( + const std::string& human_readable_trace_file_path, + ROCKSDB_NAMESPACE::Env* env) { + if (human_readable_trace_file_path.empty()) { + return Status::InvalidArgument( + "The provided human_readable_trace_file_path is null."); + } + return env->NewWritableFile(human_readable_trace_file_path, + &human_readable_trace_file_writer_, EnvOptions()); +} + +Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord( + const BlockCacheTraceRecord& access, uint64_t block_id, + uint64_t get_key_id) { + if (!human_readable_trace_file_writer_) { + return Status::OK(); + } + int ret = snprintf( + trace_record_buffer_, sizeof(trace_record_buffer_), + "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32 + ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64 + ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n", + access.access_timestamp, block_id, access.block_type, access.block_size, + access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number, + access.caller, access.no_insert, access.get_id, get_key_id, + access.referenced_data_size, access.is_cache_hit, + access.referenced_key_exist_in_block, access.num_keys_in_block, + BlockCacheTraceHelper::GetTableId(access), + BlockCacheTraceHelper::GetSequenceNumber(access), + static_cast<uint64_t>(access.block_key.size()), + static_cast<uint64_t>(access.referenced_key.size()), + BlockCacheTraceHelper::GetBlockOffsetInFile(access)); + if (ret < 0) { + return Status::IOError("failed to format the output"); + } + std::string printout(trace_record_buffer_); + return human_readable_trace_file_writer_->Append(printout); +} + +BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader( + const std::string& trace_file_path) + : BlockCacheTraceReader(/*trace_reader=*/nullptr) { + human_readable_trace_reader_.open(trace_file_path, std::ifstream::in); +} + +BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() { + human_readable_trace_reader_.close(); +} + +Status BlockCacheHumanReadableTraceReader::ReadHeader( + BlockCacheTraceHeader* /*header*/) { + return Status::OK(); +} + +Status BlockCacheHumanReadableTraceReader::ReadAccess( + BlockCacheTraceRecord* record) { + std::string line; + if (!std::getline(human_readable_trace_reader_, line)) { + return Status::Incomplete("No more records to read."); + } + std::stringstream ss(line); + std::vector<std::string> record_strs; + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + record_strs.push_back(substr); + } + if (record_strs.size() != 21) { + return Status::Incomplete("Records format is wrong."); + } + + record->access_timestamp = ParseUint64(record_strs[0]); + uint64_t block_key = ParseUint64(record_strs[1]); + record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2])); + record->block_size = ParseUint64(record_strs[3]); + record->cf_id = ParseUint64(record_strs[4]); + record->cf_name = record_strs[5]; + record->level = static_cast<uint32_t>(ParseUint64(record_strs[6])); + record->sst_fd_number = ParseUint64(record_strs[7]); + record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8])); + record->no_insert = static_cast<Boolean>(ParseUint64(record_strs[9])); + record->get_id = ParseUint64(record_strs[10]); + uint64_t get_key_id = ParseUint64(record_strs[11]); + + record->referenced_data_size = ParseUint64(record_strs[12]); + record->is_cache_hit = static_cast<Boolean>(ParseUint64(record_strs[13])); + record->referenced_key_exist_in_block = + static_cast<Boolean>(ParseUint64(record_strs[14])); + record->num_keys_in_block = ParseUint64(record_strs[15]); + uint64_t table_id = ParseUint64(record_strs[16]); + if (table_id > 0) { + // Decrement since valid table id in the trace file equals traced table id + // + 1. + table_id -= 1; + } + uint64_t get_sequence_number = ParseUint64(record_strs[17]); + if (get_sequence_number > 0) { + record->get_from_user_specified_snapshot = Boolean::kTrue; + // Decrement since valid seq number in the trace file equals traced seq + // number + 1. + get_sequence_number -= 1; + } + uint64_t block_key_size = ParseUint64(record_strs[18]); + uint64_t get_key_size = ParseUint64(record_strs[19]); + uint64_t block_offset = ParseUint64(record_strs[20]); + + std::string tmp_block_key; + PutVarint64(&tmp_block_key, block_key); + PutVarint64(&tmp_block_key, block_offset); + // Append 1 until the size is the same as traced block key size. + while (record->block_key.size() < block_key_size - tmp_block_key.size()) { + record->block_key += "1"; + } + record->block_key += tmp_block_key; + + if (get_key_id != 0) { + std::string tmp_get_key; + PutFixed64(&tmp_get_key, get_key_id); + PutFixed64(&tmp_get_key, get_sequence_number << 8); + PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id)); + // Append 1 until the size is the same as traced key size. + while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) { + record->referenced_key += "1"; + } + record->referenced_key += tmp_get_key; + } + return Status::OK(); +} + +BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); } + +BlockCacheTracer::~BlockCacheTracer() { EndTrace(); } + +Status BlockCacheTracer::StartTrace( + Env* env, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (writer_.load()) { + return Status::Busy(); + } + get_id_counter_.store(1); + trace_options_ = trace_options; + writer_.store( + new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer))); + return writer_.load()->WriteHeader(); +} + +void BlockCacheTracer::EndTrace() { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return; + } + delete writer_.load(); + writer_.store(nullptr); +} + +Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record, + const Slice& block_key, + const Slice& cf_name, + const Slice& referenced_key) { + if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) { + return Status::OK(); + } + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return Status::OK(); + } + return writer_.load()->WriteBlockAccess(record, block_key, cf_name, + referenced_key); +} + +uint64_t BlockCacheTracer::NextGetId() { + if (!writer_.load(std::memory_order_relaxed)) { + return BlockCacheTraceHelper::kReservedGetId; + } + uint64_t prev_value = get_id_counter_.fetch_add(1); + if (prev_value == BlockCacheTraceHelper::kReservedGetId) { + // fetch and add again. + return get_id_counter_.fetch_add(1); + } + return prev_value; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/block_cache_tracer.h b/src/rocksdb/trace_replay/block_cache_tracer.h new file mode 100644 index 000000000..5849273dc --- /dev/null +++ b/src/rocksdb/trace_replay/block_cache_tracer.h @@ -0,0 +1,294 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <fstream> + +#include "monitoring/instrumented_mutex.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/trace_reader_writer.h" +#include "table/table_reader_caller.h" +#include "trace_replay/trace_replay.h" + +namespace ROCKSDB_NAMESPACE { + +extern const uint64_t kMicrosInSecond; +extern const uint64_t kSecondInMinute; +extern const uint64_t kSecondInHour; + +struct BlockCacheTraceRecord; + +class BlockCacheTraceHelper { + public: + static bool IsGetOrMultiGetOnDataBlock(TraceType block_type, + TableReaderCaller caller); + static bool IsGetOrMultiGet(TableReaderCaller caller); + static bool IsUserAccess(TableReaderCaller caller); + // Row key is a concatenation of the access's fd_number and the referenced + // user key. + static std::string ComputeRowKey(const BlockCacheTraceRecord& access); + // The first four bytes of the referenced key in a Get request is the table + // id. + static uint64_t GetTableId(const BlockCacheTraceRecord& access); + // The sequence number of a get request is the last part of the referenced + // key. + static uint64_t GetSequenceNumber(const BlockCacheTraceRecord& access); + // Block offset in a file is the last varint64 in the block key. + static uint64_t GetBlockOffsetInFile(const BlockCacheTraceRecord& access); + + static const std::string kUnknownColumnFamilyName; + static const uint64_t kReservedGetId; +}; + +// Lookup context for tracing block cache accesses. +// We trace block accesses at five places: +// 1. BlockBasedTable::GetFilter +// 2. BlockBasedTable::GetUncompressedDict. +// 3. BlockBasedTable::MaybeReadAndLoadToCache. (To trace access on data, index, +// and range deletion block.) +// 4. BlockBasedTable::Get. (To trace the referenced key and whether the +// referenced key exists in a fetched data block.) +// 5. BlockBasedTable::MultiGet. (To trace the referenced key and whether the +// referenced key exists in a fetched data block.) +// The context is created at: +// 1. BlockBasedTable::Get. (kUserGet) +// 2. BlockBasedTable::MultiGet. (kUserMGet) +// 3. BlockBasedTable::NewIterator. (either kUserIterator, kCompaction, or +// external SST ingestion calls this function.) +// 4. BlockBasedTable::Open. (kPrefetch) +// 5. Index/Filter::CacheDependencies. (kPrefetch) +// 6. BlockBasedTable::ApproximateOffsetOf. (kCompaction or +// kUserApproximateSize). +struct BlockCacheLookupContext { + BlockCacheLookupContext(const TableReaderCaller& _caller) : caller(_caller) {} + BlockCacheLookupContext(const TableReaderCaller& _caller, uint64_t _get_id, + bool _get_from_user_specified_snapshot) + : caller(_caller), + get_id(_get_id), + get_from_user_specified_snapshot(_get_from_user_specified_snapshot) {} + const TableReaderCaller caller; + // These are populated when we perform lookup/insert on block cache. The block + // cache tracer uses these inforation when logging the block access at + // BlockBasedTable::GET and BlockBasedTable::MultiGet. + bool is_cache_hit = false; + bool no_insert = false; + TraceType block_type = TraceType::kTraceMax; + uint64_t block_size = 0; + std::string block_key; + uint64_t num_keys_in_block = 0; + // The unique id associated with Get and MultiGet. This enables us to track + // how many blocks a Get/MultiGet request accesses. We can also measure the + // impact of row cache vs block cache. + uint64_t get_id = 0; + std::string referenced_key; + bool get_from_user_specified_snapshot = false; + + void FillLookupContext(bool _is_cache_hit, bool _no_insert, + TraceType _block_type, uint64_t _block_size, + const std::string& _block_key, + uint64_t _num_keys_in_block) { + is_cache_hit = _is_cache_hit; + no_insert = _no_insert; + block_type = _block_type; + block_size = _block_size; + block_key = _block_key; + num_keys_in_block = _num_keys_in_block; + } +}; + +enum Boolean : char { kTrue = 1, kFalse = 0 }; + +struct BlockCacheTraceRecord { + // Required fields for all accesses. + uint64_t access_timestamp = 0; + std::string block_key; + TraceType block_type = TraceType::kTraceMax; + uint64_t block_size = 0; + uint64_t cf_id = 0; + std::string cf_name; + uint32_t level = 0; + uint64_t sst_fd_number = 0; + TableReaderCaller caller = TableReaderCaller::kMaxBlockCacheLookupCaller; + Boolean is_cache_hit = Boolean::kFalse; + Boolean no_insert = Boolean::kFalse; + // Required field for Get and MultiGet + uint64_t get_id = BlockCacheTraceHelper::kReservedGetId; + Boolean get_from_user_specified_snapshot = Boolean::kFalse; + std::string referenced_key; + // Required fields for data block and user Get/Multi-Get only. + uint64_t referenced_data_size = 0; + uint64_t num_keys_in_block = 0; + Boolean referenced_key_exist_in_block = Boolean::kFalse; + + BlockCacheTraceRecord() {} + + BlockCacheTraceRecord( + uint64_t _access_timestamp, std::string _block_key, TraceType _block_type, + uint64_t _block_size, uint64_t _cf_id, std::string _cf_name, + uint32_t _level, uint64_t _sst_fd_number, TableReaderCaller _caller, + bool _is_cache_hit, bool _no_insert, + uint64_t _get_id = BlockCacheTraceHelper::kReservedGetId, + bool _get_from_user_specified_snapshot = false, + std::string _referenced_key = "", uint64_t _referenced_data_size = 0, + uint64_t _num_keys_in_block = 0, + bool _referenced_key_exist_in_block = false) + : access_timestamp(_access_timestamp), + block_key(_block_key), + block_type(_block_type), + block_size(_block_size), + cf_id(_cf_id), + cf_name(_cf_name), + level(_level), + sst_fd_number(_sst_fd_number), + caller(_caller), + is_cache_hit(_is_cache_hit ? Boolean::kTrue : Boolean::kFalse), + no_insert(_no_insert ? Boolean::kTrue : Boolean::kFalse), + get_id(_get_id), + get_from_user_specified_snapshot(_get_from_user_specified_snapshot + ? Boolean::kTrue + : Boolean::kFalse), + referenced_key(_referenced_key), + referenced_data_size(_referenced_data_size), + num_keys_in_block(_num_keys_in_block), + referenced_key_exist_in_block( + _referenced_key_exist_in_block ? Boolean::kTrue : Boolean::kFalse) { + } +}; + +struct BlockCacheTraceHeader { + uint64_t start_time; + uint32_t rocksdb_major_version; + uint32_t rocksdb_minor_version; +}; + +// BlockCacheTraceWriter captures all RocksDB block cache accesses using a +// user-provided TraceWriter. Every RocksDB operation is written as a single +// trace. Each trace will have a timestamp and type, followed by the trace +// payload. +class BlockCacheTraceWriter { + public: + BlockCacheTraceWriter(Env* env, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer); + ~BlockCacheTraceWriter() = default; + // No copy and move. + BlockCacheTraceWriter(const BlockCacheTraceWriter&) = delete; + BlockCacheTraceWriter& operator=(const BlockCacheTraceWriter&) = delete; + BlockCacheTraceWriter(BlockCacheTraceWriter&&) = delete; + BlockCacheTraceWriter& operator=(BlockCacheTraceWriter&&) = delete; + + // Pass Slice references to avoid copy. + Status WriteBlockAccess(const BlockCacheTraceRecord& record, + const Slice& block_key, const Slice& cf_name, + const Slice& referenced_key); + + // Write a trace header at the beginning, typically on initiating a trace, + // with some metadata like a magic number and RocksDB version. + Status WriteHeader(); + + private: + Env* env_; + TraceOptions trace_options_; + std::unique_ptr<TraceWriter> trace_writer_; +}; + +// Write a trace record in human readable format, see +// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format +// for details. +class BlockCacheHumanReadableTraceWriter { + public: + ~BlockCacheHumanReadableTraceWriter(); + + Status NewWritableFile(const std::string& human_readable_trace_file_path, + ROCKSDB_NAMESPACE::Env* env); + + Status WriteHumanReadableTraceRecord(const BlockCacheTraceRecord& access, + uint64_t block_id, uint64_t get_key_id); + + private: + char trace_record_buffer_[1024 * 1024]; + std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> + human_readable_trace_file_writer_; +}; + +// BlockCacheTraceReader helps read the trace file generated by +// BlockCacheTraceWriter using a user provided TraceReader. +class BlockCacheTraceReader { + public: + BlockCacheTraceReader(std::unique_ptr<TraceReader>&& reader); + ~BlockCacheTraceReader() = default; + // No copy and move. + BlockCacheTraceReader(const BlockCacheTraceReader&) = delete; + BlockCacheTraceReader& operator=(const BlockCacheTraceReader&) = delete; + BlockCacheTraceReader(BlockCacheTraceReader&&) = delete; + BlockCacheTraceReader& operator=(BlockCacheTraceReader&&) = delete; + + Status ReadHeader(BlockCacheTraceHeader* header); + + Status ReadAccess(BlockCacheTraceRecord* record); + + private: + std::unique_ptr<TraceReader> trace_reader_; +}; + +// Read a trace record in human readable format, see +// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format +// for detailed. +class BlockCacheHumanReadableTraceReader : public BlockCacheTraceReader { + public: + BlockCacheHumanReadableTraceReader(const std::string& trace_file_path); + + ~BlockCacheHumanReadableTraceReader(); + + Status ReadHeader(BlockCacheTraceHeader* header); + + Status ReadAccess(BlockCacheTraceRecord* record); + + private: + std::ifstream human_readable_trace_reader_; +}; + +// A block cache tracer. It downsamples the accesses according to +// trace_options and uses BlockCacheTraceWriter to write the access record to +// the trace file. +class BlockCacheTracer { + public: + BlockCacheTracer(); + ~BlockCacheTracer(); + // No copy and move. + BlockCacheTracer(const BlockCacheTracer&) = delete; + BlockCacheTracer& operator=(const BlockCacheTracer&) = delete; + BlockCacheTracer(BlockCacheTracer&&) = delete; + BlockCacheTracer& operator=(BlockCacheTracer&&) = delete; + + // Start writing block cache accesses to the trace_writer. + Status StartTrace(Env* env, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer); + + // Stop writing block cache accesses to the trace_writer. + void EndTrace(); + + bool is_tracing_enabled() const { + return writer_.load(std::memory_order_relaxed); + } + + Status WriteBlockAccess(const BlockCacheTraceRecord& record, + const Slice& block_key, const Slice& cf_name, + const Slice& referenced_key); + + // GetId cycles from 1 to port::kMaxUint64. + uint64_t NextGetId(); + + private: + TraceOptions trace_options_; + // A mutex protects the writer_. + InstrumentedMutex trace_writer_mutex_; + std::atomic<BlockCacheTraceWriter*> writer_; + std::atomic<uint64_t> get_id_counter_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/block_cache_tracer_test.cc b/src/rocksdb/trace_replay/block_cache_tracer_test.cc new file mode 100644 index 000000000..b29600890 --- /dev/null +++ b/src/rocksdb/trace_replay/block_cache_tracer_test.cc @@ -0,0 +1,378 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/block_cache_tracer.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_reader_writer.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const uint64_t kBlockSize = 1024; +const std::string kBlockKeyPrefix = "test-block-"; +const uint32_t kCFId = 0; +const uint32_t kLevel = 1; +const uint64_t kSSTFDNumber = 100; +const std::string kRefKeyPrefix = "test-get-"; +const uint64_t kNumKeysInBlock = 1024; +const uint64_t kReferencedDataSize = 10; +} // namespace + +class BlockCacheTracerTest : public testing::Test { + public: + BlockCacheTracerTest() { + test_path_ = test::PerThreadDBPath("block_cache_tracer_test"); + env_ = ROCKSDB_NAMESPACE::Env::Default(); + EXPECT_OK(env_->CreateDir(test_path_)); + trace_file_path_ = test_path_ + "/block_cache_trace"; + } + + ~BlockCacheTracerTest() override { + EXPECT_OK(env_->DeleteFile(trace_file_path_)); + EXPECT_OK(env_->DeleteDir(test_path_)); + } + + TableReaderCaller GetCaller(uint32_t key_id) { + uint32_t n = key_id % 5; + switch (n) { + case 0: + return TableReaderCaller::kPrefetch; + case 1: + return TableReaderCaller::kCompaction; + case 2: + return TableReaderCaller::kUserGet; + case 3: + return TableReaderCaller::kUserMultiGet; + case 4: + return TableReaderCaller::kUserIterator; + } + assert(false); + } + + void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id, + TraceType block_type, uint32_t nblocks) { + assert(writer); + for (uint32_t i = 0; i < nblocks; i++) { + uint32_t key_id = from_key_id + i; + BlockCacheTraceRecord record; + record.block_type = block_type; + record.block_size = kBlockSize + key_id; + record.block_key = (kBlockKeyPrefix + std::to_string(key_id)); + record.access_timestamp = env_->NowMicros(); + record.cf_id = kCFId; + record.cf_name = kDefaultColumnFamilyName; + record.caller = GetCaller(key_id); + record.level = kLevel; + record.sst_fd_number = kSSTFDNumber + key_id; + record.is_cache_hit = Boolean::kFalse; + record.no_insert = Boolean::kFalse; + // Provide get_id for all callers. The writer should only write get_id + // when the caller is either GET or MGET. + record.get_id = key_id + 1; + record.get_from_user_specified_snapshot = Boolean::kTrue; + // Provide these fields for all block types. + // The writer should only write these fields for data blocks and the + // caller is either GET or MGET. + record.referenced_key = (kRefKeyPrefix + std::to_string(key_id)); + record.referenced_key_exist_in_block = Boolean::kTrue; + record.num_keys_in_block = kNumKeysInBlock; + record.referenced_data_size = kReferencedDataSize + key_id; + ASSERT_OK(writer->WriteBlockAccess( + record, record.block_key, record.cf_name, record.referenced_key)); + } + } + + BlockCacheTraceRecord GenerateAccessRecord() { + uint32_t key_id = 0; + BlockCacheTraceRecord record; + record.block_type = TraceType::kBlockTraceDataBlock; + record.block_size = kBlockSize; + record.block_key = kBlockKeyPrefix + std::to_string(key_id); + record.access_timestamp = env_->NowMicros(); + record.cf_id = kCFId; + record.cf_name = kDefaultColumnFamilyName; + record.caller = GetCaller(key_id); + record.level = kLevel; + record.sst_fd_number = kSSTFDNumber + key_id; + record.is_cache_hit = Boolean::kFalse; + record.no_insert = Boolean::kFalse; + record.referenced_key = kRefKeyPrefix + std::to_string(key_id); + record.referenced_key_exist_in_block = Boolean::kTrue; + record.num_keys_in_block = kNumKeysInBlock; + return record; + } + + void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id, + TraceType block_type, uint32_t nblocks) { + assert(reader); + for (uint32_t i = 0; i < nblocks; i++) { + uint32_t key_id = from_key_id + i; + BlockCacheTraceRecord record; + ASSERT_OK(reader->ReadAccess(&record)); + ASSERT_EQ(block_type, record.block_type); + ASSERT_EQ(kBlockSize + key_id, record.block_size); + ASSERT_EQ(kBlockKeyPrefix + std::to_string(key_id), record.block_key); + ASSERT_EQ(kCFId, record.cf_id); + ASSERT_EQ(kDefaultColumnFamilyName, record.cf_name); + ASSERT_EQ(GetCaller(key_id), record.caller); + ASSERT_EQ(kLevel, record.level); + ASSERT_EQ(kSSTFDNumber + key_id, record.sst_fd_number); + ASSERT_EQ(Boolean::kFalse, record.is_cache_hit); + ASSERT_EQ(Boolean::kFalse, record.no_insert); + if (record.caller == TableReaderCaller::kUserGet || + record.caller == TableReaderCaller::kUserMultiGet) { + ASSERT_EQ(key_id + 1, record.get_id); + ASSERT_EQ(Boolean::kTrue, record.get_from_user_specified_snapshot); + ASSERT_EQ(kRefKeyPrefix + std::to_string(key_id), + record.referenced_key); + } else { + ASSERT_EQ(BlockCacheTraceHelper::kReservedGetId, record.get_id); + ASSERT_EQ(Boolean::kFalse, record.get_from_user_specified_snapshot); + ASSERT_EQ("", record.referenced_key); + } + if (block_type == TraceType::kBlockTraceDataBlock && + (record.caller == TableReaderCaller::kUserGet || + record.caller == TableReaderCaller::kUserMultiGet)) { + ASSERT_EQ(Boolean::kTrue, record.referenced_key_exist_in_block); + ASSERT_EQ(kNumKeysInBlock, record.num_keys_in_block); + ASSERT_EQ(kReferencedDataSize + key_id, record.referenced_data_size); + continue; + } + ASSERT_EQ(Boolean::kFalse, record.referenced_key_exist_in_block); + ASSERT_EQ(0, record.num_keys_in_block); + ASSERT_EQ(0, record.referenced_data_size); + } + } + + Env* env_; + EnvOptions env_options_; + std::string trace_file_path_; + std::string test_path_; +}; + +TEST_F(BlockCacheTracerTest, AtomicWriteBeforeStartTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTracer writer; + // The record should be written to the trace_file since StartTrace is not + // called. + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains nothing. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_NOK(reader.ReadHeader(&header)); + } +} + +TEST_F(BlockCacheTracerTest, AtomicWrite) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTracer writer; + ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + +TEST_F(BlockCacheTracerTest, ConsecutiveStartTrace) { + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK( + NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); + BlockCacheTracer writer; + ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_NOK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(env_->FileExists(trace_file_path_)); +} + +TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTracer writer; + ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + writer.EndTrace(); + // Write the record again. This time the record should not be written since + // EndTrace is called. + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + +TEST_F(BlockCacheTracerTest, NextGetId) { + BlockCacheTracer writer; + { + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + // next get id should always return 0 before we call StartTrace. + ASSERT_EQ(0, writer.NextGetId()); + ASSERT_EQ(0, writer.NextGetId()); + ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_EQ(1, writer.NextGetId()); + ASSERT_EQ(2, writer.NextGetId()); + writer.EndTrace(); + // next get id should return 0. + ASSERT_EQ(0, writer.NextGetId()); + } + + // Start trace again and next get id should return 1. + { + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_EQ(1, writer.NextGetId()); + } +} + +TEST_F(BlockCacheTracerTest, MixedBlocks) { + { + // Generate a trace file containing a mix of blocks. + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer)); + ASSERT_OK(writer.WriteHeader()); + // Write blocks of different types. + WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock, + 10); + WriteBlockAccess(&writer, 10, TraceType::kBlockTraceDataBlock, 10); + WriteBlockAccess(&writer, 20, TraceType::kBlockTraceFilterBlock, 10); + WriteBlockAccess(&writer, 30, TraceType::kBlockTraceIndexBlock, 10); + WriteBlockAccess(&writer, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + + { + // Verify trace file is generated correctly. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + // Read blocks. + VerifyAccess(&reader, 0, TraceType::kBlockTraceUncompressionDictBlock, 10); + VerifyAccess(&reader, 10, TraceType::kBlockTraceDataBlock, 10); + VerifyAccess(&reader, 20, TraceType::kBlockTraceFilterBlock, 10); + VerifyAccess(&reader, 30, TraceType::kBlockTraceIndexBlock, 10); + VerifyAccess(&reader, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); + // Read one more record should report an error. + BlockCacheTraceRecord record; + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + +TEST_F(BlockCacheTracerTest, HumanReadableTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + record.get_id = 1; + record.referenced_key = ""; + record.caller = TableReaderCaller::kUserGet; + record.get_from_user_specified_snapshot = Boolean::kTrue; + record.referenced_data_size = kReferencedDataSize; + PutFixed32(&record.referenced_key, 111); + PutLengthPrefixedSlice(&record.referenced_key, "get_key"); + PutFixed64(&record.referenced_key, 2 << 8); + PutLengthPrefixedSlice(&record.block_key, "block_key"); + PutVarint64(&record.block_key, 333); + { + // Generate a human readable trace file. + BlockCacheHumanReadableTraceWriter writer; + ASSERT_OK(writer.NewWritableFile(trace_file_path_, env_)); + ASSERT_OK(writer.WriteHumanReadableTraceRecord(record, 1, 1)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + BlockCacheHumanReadableTraceReader reader(trace_file_path_); + BlockCacheTraceHeader header; + BlockCacheTraceRecord read_record; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_OK(reader.ReadAccess(&read_record)); + ASSERT_EQ(TraceType::kBlockTraceDataBlock, read_record.block_type); + ASSERT_EQ(kBlockSize, read_record.block_size); + ASSERT_EQ(kCFId, read_record.cf_id); + ASSERT_EQ(kDefaultColumnFamilyName, read_record.cf_name); + ASSERT_EQ(TableReaderCaller::kUserGet, read_record.caller); + ASSERT_EQ(kLevel, read_record.level); + ASSERT_EQ(kSSTFDNumber, read_record.sst_fd_number); + ASSERT_EQ(Boolean::kFalse, read_record.is_cache_hit); + ASSERT_EQ(Boolean::kFalse, read_record.no_insert); + ASSERT_EQ(1, read_record.get_id); + ASSERT_EQ(Boolean::kTrue, read_record.get_from_user_specified_snapshot); + ASSERT_EQ(Boolean::kTrue, read_record.referenced_key_exist_in_block); + ASSERT_EQ(kNumKeysInBlock, read_record.num_keys_in_block); + ASSERT_EQ(kReferencedDataSize, read_record.referenced_data_size); + ASSERT_EQ(record.block_key.size(), read_record.block_key.size()); + ASSERT_EQ(record.referenced_key.size(), record.referenced_key.size()); + ASSERT_EQ(112, BlockCacheTraceHelper::GetTableId(read_record)); + ASSERT_EQ(3, BlockCacheTraceHelper::GetSequenceNumber(read_record)); + ASSERT_EQ(333, BlockCacheTraceHelper::GetBlockOffsetInFile(read_record)); + // Read again should fail. + ASSERT_NOK(reader.ReadAccess(&read_record)); + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/trace_replay/trace_replay.cc b/src/rocksdb/trace_replay/trace_replay.cc new file mode 100644 index 000000000..a0f9a504f --- /dev/null +++ b/src/rocksdb/trace_replay/trace_replay.cc @@ -0,0 +1,485 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/trace_replay.h" + +#include <chrono> +#include <sstream> +#include <thread> +#include "db/db_impl/db_impl.h" +#include "rocksdb/slice.h" +#include "rocksdb/write_batch.h" +#include "util/coding.h" +#include "util/string_util.h" +#include "util/threadpool_imp.h" + +namespace ROCKSDB_NAMESPACE { + +const std::string kTraceMagic = "feedcafedeadbeef"; + +namespace { +void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) { + PutFixed32(dst, cf_id); + PutLengthPrefixedSlice(dst, key); +} + +void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { + Slice buf(buffer); + GetFixed32(&buf, cf_id); + GetLengthPrefixedSlice(&buf, key); +} +} // namespace + +void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) { + assert(encoded_trace); + PutFixed64(encoded_trace, trace.ts); + encoded_trace->push_back(trace.type); + PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size())); + encoded_trace->append(trace.payload); +} + +Status TracerHelper::DecodeTrace(const std::string& encoded_trace, + Trace* trace) { + assert(trace != nullptr); + Slice enc_slice = Slice(encoded_trace); + if (!GetFixed64(&enc_slice, &trace->ts)) { + return Status::Incomplete("Decode trace string failed"); + } + if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) { + return Status::Incomplete("Decode trace string failed"); + } + trace->type = static_cast<TraceType>(enc_slice[0]); + enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize); + trace->payload = enc_slice.ToString(); + return Status::OK(); +} + +Tracer::Tracer(Env* env, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) + : env_(env), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)), + trace_request_count_ (0) { + WriteHeader(); +} + +Tracer::~Tracer() { trace_writer_.reset(); } + +Status Tracer::Write(WriteBatch* write_batch) { + TraceType trace_type = kTraceWrite; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = trace_type; + trace.payload = write_batch->Data(); + return WriteTrace(trace); +} + +Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { + TraceType trace_type = kTraceGet; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = trace_type; + EncodeCFAndKey(&trace.payload, column_family->GetID(), key); + return WriteTrace(trace); +} + +Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { + TraceType trace_type = kTraceIteratorSeek; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = trace_type; + EncodeCFAndKey(&trace.payload, cf_id, key); + return WriteTrace(trace); +} + +Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { + TraceType trace_type = kTraceIteratorSeekForPrev; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = trace_type; + EncodeCFAndKey(&trace.payload, cf_id, key); + return WriteTrace(trace); +} + +bool Tracer::ShouldSkipTrace(const TraceType& trace_type) { + if (IsTraceFileOverMax()) { + return true; + } + if ((trace_options_.filter & kTraceFilterGet + && trace_type == kTraceGet) + || (trace_options_.filter & kTraceFilterWrite + && trace_type == kTraceWrite)) { + return true; + } + ++trace_request_count_; + if (trace_request_count_ < trace_options_.sampling_frequency) { + return true; + } + trace_request_count_ = 0; + return false; +} + +bool Tracer::IsTraceFileOverMax() { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + return (trace_file_size > trace_options_.max_trace_file_size); +} + +Status Tracer::WriteHeader() { + std::ostringstream s; + s << kTraceMagic << "\t" + << "Trace Version: 0.1\t" + << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" + << "Format: Timestamp OpType Payload\n"; + std::string header(s.str()); + + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceBegin; + trace.payload = header; + return WriteTrace(trace); +} + +Status Tracer::WriteFooter() { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceEnd; + trace.payload = ""; + return WriteTrace(trace); +} + +Status Tracer::WriteTrace(const Trace& trace) { + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(Slice(encoded_trace)); +} + +Status Tracer::Close() { return WriteFooter(); } + +Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles, + std::unique_ptr<TraceReader>&& reader) + : trace_reader_(std::move(reader)) { + assert(db != nullptr); + db_ = static_cast<DBImpl*>(db->GetRootDB()); + env_ = Env::Default(); + for (ColumnFamilyHandle* cfh : handles) { + cf_map_[cfh->GetID()] = cfh; + } + fast_forward_ = 1; +} + +Replayer::~Replayer() { trace_reader_.reset(); } + +Status Replayer::SetFastForward(uint32_t fast_forward) { + Status s; + if (fast_forward < 1) { + s = Status::InvalidArgument("Wrong fast forward speed!"); + } else { + fast_forward_ = fast_forward; + s = Status::OK(); + } + return s; +} + +Status Replayer::Replay() { + Status s; + Trace header; + s = ReadHeader(&header); + if (!s.ok()) { + return s; + } + + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + WriteOptions woptions; + ReadOptions roptions; + Trace trace; + uint64_t ops = 0; + Iterator* single_iter = nullptr; + while (s.ok()) { + trace.reset(); + s = ReadTrace(&trace); + if (!s.ok()) { + break; + } + + std::this_thread::sleep_until( + replay_epoch + + std::chrono::microseconds((trace.ts - header.ts) / fast_forward_)); + if (trace.type == kTraceWrite) { + WriteBatch batch(trace.payload); + db_->Write(woptions, &batch); + ops++; + } else if (trace.type == kTraceGet) { + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + std::string value; + if (cf_id == 0) { + db_->Get(roptions, key, &value); + } else { + db_->Get(roptions, cf_map_[cf_id], key, &value); + } + ops++; + } else if (trace.type == kTraceIteratorSeek) { + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + if (cf_id == 0) { + single_iter = db_->NewIterator(roptions); + } else { + single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + } + single_iter->Seek(key); + ops++; + delete single_iter; + } else if (trace.type == kTraceIteratorSeekForPrev) { + // Currently, only support to call the Seek() + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + if (cf_id == 0) { + single_iter = db_->NewIterator(roptions); + } else { + single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + } + single_iter->SeekForPrev(key); + ops++; + delete single_iter; + } else if (trace.type == kTraceEnd) { + // Do nothing for now. + // TODO: Add some validations later. + break; + } + } + + if (s.IsIncomplete()) { + // Reaching eof returns Incomplete status at the moment. + // Could happen when killing a process without calling EndTrace() API. + // TODO: Add better error handling. + return Status::OK(); + } + return s; +} + +// The trace can be replayed with multithread by configurnge the number of +// threads in the thread pool. Trace records are read from the trace file +// sequentially and the corresponding queries are scheduled in the task +// queue based on the timestamp. Currently, we support Write_batch (Put, +// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev). +Status Replayer::MultiThreadReplay(uint32_t threads_num) { + Status s; + Trace header; + s = ReadHeader(&header); + if (!s.ok()) { + return s; + } + + ThreadPoolImpl thread_pool; + thread_pool.SetHostEnv(env_); + + if (threads_num > 1) { + thread_pool.SetBackgroundThreads(static_cast<int>(threads_num)); + } else { + thread_pool.SetBackgroundThreads(1); + } + + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + WriteOptions woptions; + ReadOptions roptions; + uint64_t ops = 0; + while (s.ok()) { + std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg); + ra->db = db_; + s = ReadTrace(&(ra->trace_entry)); + if (!s.ok()) { + break; + } + ra->woptions = woptions; + ra->roptions = roptions; + + std::this_thread::sleep_until( + replay_epoch + std::chrono::microseconds( + (ra->trace_entry.ts - header.ts) / fast_forward_)); + if (ra->trace_entry.type == kTraceWrite) { + thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr, + nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceGet) { + thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr, + nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceIteratorSeek) { + thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr, + nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) { + thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(), + nullptr, nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceEnd) { + // Do nothing for now. + // TODO: Add some validations later. + break; + } else { + // Other trace entry types that are not implemented for replay. + // To finish the replay, we continue the process. + continue; + } + } + + if (s.IsIncomplete()) { + // Reaching eof returns Incomplete status at the moment. + // Could happen when killing a process without calling EndTrace() API. + // TODO: Add better error handling. + s = Status::OK(); + } + thread_pool.JoinAllThreads(); + return s; +} + +Status Replayer::ReadHeader(Trace* header) { + assert(header != nullptr); + Status s = ReadTrace(header); + if (!s.ok()) { + return s; + } + if (header->type != kTraceBegin) { + return Status::Corruption("Corrupted trace file. Incorrect header."); + } + if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { + return Status::Corruption("Corrupted trace file. Incorrect magic."); + } + + return s; +} + +Status Replayer::ReadFooter(Trace* footer) { + assert(footer != nullptr); + Status s = ReadTrace(footer); + if (!s.ok()) { + return s; + } + if (footer->type != kTraceEnd) { + return Status::Corruption("Corrupted trace file. Incorrect footer."); + } + + // TODO: Add more validations later + return s; +} + +Status Replayer::ReadTrace(Trace* trace) { + assert(trace != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + return TracerHelper::DecodeTrace(encoded_trace, trace); +} + +void Replayer::BGWorkGet(void* arg) { + std::unique_ptr<ReplayerWorkerArg> ra( + reinterpret_cast<ReplayerWorkerArg*>(arg)); + auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>( + ra->cf_map); + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); + if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + return; + } + + std::string value; + if (cf_id == 0) { + ra->db->Get(ra->roptions, key, &value); + } else { + ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value); + } + + return; +} + +void Replayer::BGWorkWriteBatch(void* arg) { + std::unique_ptr<ReplayerWorkerArg> ra( + reinterpret_cast<ReplayerWorkerArg*>(arg)); + WriteBatch batch(ra->trace_entry.payload); + ra->db->Write(ra->woptions, &batch); + return; +} + +void Replayer::BGWorkIterSeek(void* arg) { + std::unique_ptr<ReplayerWorkerArg> ra( + reinterpret_cast<ReplayerWorkerArg*>(arg)); + auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>( + ra->cf_map); + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); + if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + return; + } + + std::string value; + Iterator* single_iter = nullptr; + if (cf_id == 0) { + single_iter = ra->db->NewIterator(ra->roptions); + } else { + single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); + } + single_iter->Seek(key); + delete single_iter; + return; +} + +void Replayer::BGWorkIterSeekForPrev(void* arg) { + std::unique_ptr<ReplayerWorkerArg> ra( + reinterpret_cast<ReplayerWorkerArg*>(arg)); + auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>( + ra->cf_map); + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); + if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + return; + } + + std::string value; + Iterator* single_iter = nullptr; + if (cf_id == 0) { + single_iter = ra->db->NewIterator(ra->roptions); + } else { + single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); + } + single_iter->SeekForPrev(key); + delete single_iter; + return; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/trace_replay.h b/src/rocksdb/trace_replay/trace_replay.h new file mode 100644 index 000000000..e7ef598f0 --- /dev/null +++ b/src/rocksdb/trace_replay/trace_replay.h @@ -0,0 +1,189 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <memory> +#include <unordered_map> +#include <utility> + +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/trace_reader_writer.h" + +namespace ROCKSDB_NAMESPACE { + +// This file contains Tracer and Replayer classes that enable capturing and +// replaying RocksDB traces. + +class ColumnFamilyHandle; +class ColumnFamilyData; +class DB; +class DBImpl; +class Slice; +class WriteBatch; + +extern const std::string kTraceMagic; +const unsigned int kTraceTimestampSize = 8; +const unsigned int kTraceTypeSize = 1; +const unsigned int kTracePayloadLengthSize = 4; +const unsigned int kTraceMetadataSize = + kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize; + +// Supported Trace types. +enum TraceType : char { + kTraceBegin = 1, + kTraceEnd = 2, + kTraceWrite = 3, + kTraceGet = 4, + kTraceIteratorSeek = 5, + kTraceIteratorSeekForPrev = 6, + // Block cache related types. + kBlockTraceIndexBlock = 7, + kBlockTraceFilterBlock = 8, + kBlockTraceDataBlock = 9, + kBlockTraceUncompressionDictBlock = 10, + kBlockTraceRangeDeletionBlock = 11, + // All trace types should be added before kTraceMax + kTraceMax, +}; + +// TODO: This should also be made part of public interface to help users build +// custom TracerReaders and TraceWriters. +// +// The data structure that defines a single trace. +struct Trace { + uint64_t ts; // timestamp + TraceType type; + std::string payload; + + void reset() { + ts = 0; + type = kTraceMax; + payload.clear(); + } +}; + +class TracerHelper { + public: + // Encode a trace object into the given string. + static void EncodeTrace(const Trace& trace, std::string* encoded_trace); + + // Decode a string into the given trace object. + static Status DecodeTrace(const std::string& encoded_trace, Trace* trace); +}; + +// Tracer captures all RocksDB operations using a user-provided TraceWriter. +// Every RocksDB operation is written as a single trace. Each trace will have a +// timestamp and type, followed by the trace payload. +class Tracer { + public: + Tracer(Env* env, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer); + ~Tracer(); + + // Trace all write operations -- Put, Merge, Delete, SingleDelete, Write + Status Write(WriteBatch* write_batch); + + // Trace Get operations. + Status Get(ColumnFamilyHandle* cfname, const Slice& key); + + // Trace Iterators. + Status IteratorSeek(const uint32_t& cf_id, const Slice& key); + Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); + + // Returns true if the trace is over the configured max trace file limit. + // False otherwise. + bool IsTraceFileOverMax(); + + // Writes a trace footer at the end of the tracing + Status Close(); + + private: + // Write a trace header at the beginning, typically on initiating a trace, + // with some metadata like a magic number, trace version, RocksDB version, and + // trace format. + Status WriteHeader(); + + // Write a trace footer, typically on ending a trace, with some metadata. + Status WriteFooter(); + + // Write a single trace using the provided TraceWriter to the underlying + // system, say, a filesystem or a streaming service. + Status WriteTrace(const Trace& trace); + + // Helps in filtering and sampling of traces. + // Returns true if a trace should be skipped, false otherwise. + bool ShouldSkipTrace(const TraceType& type); + + Env* env_; + TraceOptions trace_options_; + std::unique_ptr<TraceWriter> trace_writer_; + uint64_t trace_request_count_; +}; + +// Replayer helps to replay the captured RocksDB operations, using a user +// provided TraceReader. +// The Replayer is instantiated via db_bench today, on using "replay" benchmark. +class Replayer { + public: + Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles, + std::unique_ptr<TraceReader>&& reader); + ~Replayer(); + + // Replay all the traces from the provided trace stream, taking the delay + // between the traces into consideration. + Status Replay(); + + // Replay the provide trace stream, which is the same as Replay(), with + // multi-threads. Queries are scheduled in the thread pool job queue. + // User can set the number of threads in the thread pool. + Status MultiThreadReplay(uint32_t threads_num); + + // Enables fast forwarding a replay by reducing the delay between the ingested + // traces. + // fast_forward : Rate of replay speedup. + // If 1, replay the operations at the same rate as in the trace stream. + // If > 1, speed up the replay by this amount. + Status SetFastForward(uint32_t fast_forward); + + private: + Status ReadHeader(Trace* header); + Status ReadFooter(Trace* footer); + Status ReadTrace(Trace* trace); + + // The background function for MultiThreadReplay to execute Get query + // based on the trace records. + static void BGWorkGet(void* arg); + + // The background function for MultiThreadReplay to execute WriteBatch + // (Put, Delete, SingleDelete, DeleteRange) based on the trace records. + static void BGWorkWriteBatch(void* arg); + + // The background function for MultiThreadReplay to execute Iterator (Seek) + // based on the trace records. + static void BGWorkIterSeek(void* arg); + + // The background function for MultiThreadReplay to execute Iterator + // (SeekForPrev) based on the trace records. + static void BGWorkIterSeekForPrev(void* arg); + + DBImpl* db_; + Env* env_; + std::unique_ptr<TraceReader> trace_reader_; + std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_; + uint32_t fast_forward_; +}; + +// The passin arg of MultiThreadRepkay for each trace record. +struct ReplayerWorkerArg { + DB* db; + Trace trace_entry; + std::unordered_map<uint32_t, ColumnFamilyHandle*>* cf_map; + WriteOptions woptions; + ReadOptions roptions; +}; + +} // namespace ROCKSDB_NAMESPACE |