diff options
Diffstat (limited to 'src/rocksdb/trace_replay')
-rw-r--r-- | src/rocksdb/trace_replay/block_cache_tracer.cc | 504 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/block_cache_tracer.h | 239 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/block_cache_tracer_test.cc | 421 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/io_tracer.cc | 303 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/io_tracer.h | 185 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/io_tracer_test.cc | 353 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_record.cc | 206 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_record_handler.cc | 190 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_record_handler.h | 46 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_record_result.cc | 146 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_replay.cc | 622 | ||||
-rw-r--r-- | src/rocksdb/trace_replay/trace_replay.h | 183 |
12 files changed, 3398 insertions, 0 deletions
diff --git a/src/rocksdb/trace_replay/block_cache_tracer.cc b/src/rocksdb/trace_replay/block_cache_tracer.cc new file mode 100644 index 000000000..508596913 --- /dev/null +++ b/src/rocksdb/trace_replay/block_cache_tracer.cc @@ -0,0 +1,504 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/block_cache_tracer.h" + +#include <cinttypes> +#include <cstdio> +#include <cstdlib> + +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "rocksdb/slice.h" +#include "rocksdb/trace_record.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +bool ShouldTrace(const Slice& block_key, + const BlockCacheTraceOptions& trace_options) { + if (trace_options.sampling_frequency == 0 || + trace_options.sampling_frequency == 1) { + return true; + } + // We use spatial downsampling so that we have a complete access history for a + // block. + return 0 == GetSliceRangedNPHash(block_key, trace_options.sampling_frequency); +} +} // namespace + +const uint64_t kMicrosInSecond = 1000 * 1000; +const uint64_t kSecondInMinute = 60; +const uint64_t kSecondInHour = 3600; +const std::string BlockCacheTraceHelper::kUnknownColumnFamilyName = + "UnknownColumnFamily"; +const uint64_t BlockCacheTraceRecord::kReservedGetId = 0; +const uint64_t BlockCacheTraceHelper::kReservedGetId = 0; + +bool BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock( + TraceType block_type, TableReaderCaller caller) { + return (block_type == TraceType::kBlockTraceDataBlock) && + IsGetOrMultiGet(caller); +} + +bool BlockCacheTraceHelper::IsGetOrMultiGet(TableReaderCaller caller) { + return caller == TableReaderCaller::kUserGet || + caller == TableReaderCaller::kUserMultiGet; +} + +bool BlockCacheTraceHelper::IsUserAccess(TableReaderCaller caller) { + return caller == TableReaderCaller::kUserGet || + caller == TableReaderCaller::kUserMultiGet || + caller == TableReaderCaller::kUserIterator || + caller == TableReaderCaller::kUserApproximateSize || + caller == TableReaderCaller::kUserVerifyChecksum; +} + +std::string BlockCacheTraceHelper::ComputeRowKey( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller)) { + return ""; + } + Slice key = ExtractUserKey(access.referenced_key); + return std::to_string(access.sst_fd_number) + "_" + key.ToString(); +} + +uint64_t BlockCacheTraceHelper::GetTableId( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller) || access.referenced_key.size() < 4) { + return 0; + } + return static_cast<uint64_t>(DecodeFixed32(access.referenced_key.data())) + 1; +} + +uint64_t BlockCacheTraceHelper::GetSequenceNumber( + const BlockCacheTraceRecord& access) { + if (!IsGetOrMultiGet(access.caller)) { + return 0; + } + return access.get_from_user_specified_snapshot + ? 1 + GetInternalKeySeqno(access.referenced_key) + : 0; +} + +uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile( + const BlockCacheTraceRecord& access) { + Slice input(access.block_key); + uint64_t offset = 0; + while (true) { + uint64_t tmp = 0; + if (GetVarint64(&input, &tmp)) { + offset = tmp; + } else { + break; + } + } + return offset; +} + +BlockCacheTraceWriterImpl::BlockCacheTraceWriterImpl( + SystemClock* clock, const BlockCacheTraceWriterOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) + : clock_(clock), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)) {} + +Status BlockCacheTraceWriterImpl::WriteBlockAccess( + const BlockCacheTraceRecord& record, const Slice& block_key, + const Slice& cf_name, const Slice& referenced_key) { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + if (trace_file_size > trace_options_.max_trace_file_size) { + return Status::OK(); + } + Trace trace; + trace.ts = record.access_timestamp; + trace.type = record.block_type; + PutLengthPrefixedSlice(&trace.payload, block_key); + PutFixed64(&trace.payload, record.block_size); + PutFixed64(&trace.payload, record.cf_id); + PutLengthPrefixedSlice(&trace.payload, cf_name); + PutFixed32(&trace.payload, record.level); + PutFixed64(&trace.payload, record.sst_fd_number); + trace.payload.push_back(record.caller); + trace.payload.push_back(record.is_cache_hit); + trace.payload.push_back(record.no_insert); + if (BlockCacheTraceHelper::IsGetOrMultiGet(record.caller)) { + PutFixed64(&trace.payload, record.get_id); + trace.payload.push_back(record.get_from_user_specified_snapshot); + PutLengthPrefixedSlice(&trace.payload, referenced_key); + } + if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record.block_type, + record.caller)) { + PutFixed64(&trace.payload, record.referenced_data_size); + PutFixed64(&trace.payload, record.num_keys_in_block); + trace.payload.push_back(record.referenced_key_exist_in_block); + } + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +Status BlockCacheTraceWriterImpl::WriteHeader() { + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = TraceType::kTraceBegin; + PutLengthPrefixedSlice(&trace.payload, kTraceMagic); + PutFixed32(&trace.payload, kMajorVersion); + PutFixed32(&trace.payload, kMinorVersion); + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +BlockCacheTraceReader::BlockCacheTraceReader( + std::unique_ptr<TraceReader>&& reader) + : trace_reader_(std::move(reader)) {} + +Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) { + assert(header != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + header->start_time = trace.ts; + Slice enc_slice = Slice(trace.payload); + Slice magnic_number; + if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read the magic number."); + } + if (magnic_number.ToString() != kTraceMagic) { + return Status::Corruption( + "Corrupted header in the trace file: Magic number does not match."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb major " + "version number."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb minor " + "version number."); + } + // We should have retrieved all information in the header. + if (!enc_slice.empty()) { + return Status::Corruption( + "Corrupted header in the trace file: The length of header is too " + "long."); + } + return Status::OK(); +} + +Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) { + assert(record); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + record->access_timestamp = trace.ts; + record->block_type = trace.type; + Slice enc_slice = Slice(trace.payload); + + const unsigned int kCharSize = 1; + + Slice block_key; + if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) { + return Status::Incomplete( + "Incomplete access record: Failed to read block key."); + } + record->block_key = block_key.ToString(); + if (!GetFixed64(&enc_slice, &record->block_size)) { + return Status::Incomplete( + "Incomplete access record: Failed to read block size."); + } + if (!GetFixed64(&enc_slice, &record->cf_id)) { + return Status::Incomplete( + "Incomplete access record: Failed to read column family ID."); + } + Slice cf_name; + if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) { + return Status::Incomplete( + "Incomplete access record: Failed to read column family name."); + } + record->cf_name = cf_name.ToString(); + if (!GetFixed32(&enc_slice, &record->level)) { + return Status::Incomplete( + "Incomplete access record: Failed to read level."); + } + if (!GetFixed64(&enc_slice, &record->sst_fd_number)) { + return Status::Incomplete( + "Incomplete access record: Failed to read SST file number."); + } + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read caller."); + } + record->caller = static_cast<TableReaderCaller>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read is_cache_hit."); + } + record->is_cache_hit = static_cast<char>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read no_insert."); + } + record->no_insert = static_cast<char>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + if (BlockCacheTraceHelper::IsGetOrMultiGet(record->caller)) { + if (!GetFixed64(&enc_slice, &record->get_id)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the get id."); + } + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read " + "get_from_user_specified_snapshot."); + } + record->get_from_user_specified_snapshot = static_cast<char>(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + Slice referenced_key; + if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the referenced key."); + } + record->referenced_key = referenced_key.ToString(); + } + if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(record->block_type, + record->caller)) { + if (!GetFixed64(&enc_slice, &record->referenced_data_size)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the referenced data size."); + } + if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) { + return Status::Incomplete( + "Incomplete access record: Failed to read the number of keys in the " + "block."); + } + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read " + "referenced_key_exist_in_block."); + } + record->referenced_key_exist_in_block = static_cast<char>(enc_slice[0]); + } + return Status::OK(); +} + +BlockCacheHumanReadableTraceWriter::~BlockCacheHumanReadableTraceWriter() { + if (human_readable_trace_file_writer_) { + human_readable_trace_file_writer_->Flush().PermitUncheckedError(); + human_readable_trace_file_writer_->Close().PermitUncheckedError(); + } +} + +Status BlockCacheHumanReadableTraceWriter::NewWritableFile( + const std::string& human_readable_trace_file_path, + ROCKSDB_NAMESPACE::Env* env) { + if (human_readable_trace_file_path.empty()) { + return Status::InvalidArgument( + "The provided human_readable_trace_file_path is null."); + } + return env->NewWritableFile(human_readable_trace_file_path, + &human_readable_trace_file_writer_, EnvOptions()); +} + +Status BlockCacheHumanReadableTraceWriter::WriteHumanReadableTraceRecord( + const BlockCacheTraceRecord& access, uint64_t block_id, + uint64_t get_key_id) { + if (!human_readable_trace_file_writer_) { + return Status::OK(); + } + int ret = snprintf( + trace_record_buffer_, sizeof(trace_record_buffer_), + "%" PRIu64 ",%" PRIu64 ",%u,%" PRIu64 ",%" PRIu64 ",%s,%" PRIu32 + ",%" PRIu64 ",%u,%u,%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%u,%u,%" PRIu64 + ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "\n", + access.access_timestamp, block_id, access.block_type, access.block_size, + access.cf_id, access.cf_name.c_str(), access.level, access.sst_fd_number, + access.caller, access.no_insert, access.get_id, get_key_id, + access.referenced_data_size, access.is_cache_hit, + access.referenced_key_exist_in_block, access.num_keys_in_block, + BlockCacheTraceHelper::GetTableId(access), + BlockCacheTraceHelper::GetSequenceNumber(access), + static_cast<uint64_t>(access.block_key.size()), + static_cast<uint64_t>(access.referenced_key.size()), + BlockCacheTraceHelper::GetBlockOffsetInFile(access)); + if (ret < 0) { + return Status::IOError("failed to format the output"); + } + std::string printout(trace_record_buffer_); + return human_readable_trace_file_writer_->Append(printout); +} + +BlockCacheHumanReadableTraceReader::BlockCacheHumanReadableTraceReader( + const std::string& trace_file_path) + : BlockCacheTraceReader(/*trace_reader=*/nullptr) { + human_readable_trace_reader_.open(trace_file_path, std::ifstream::in); +} + +BlockCacheHumanReadableTraceReader::~BlockCacheHumanReadableTraceReader() { + human_readable_trace_reader_.close(); +} + +Status BlockCacheHumanReadableTraceReader::ReadHeader( + BlockCacheTraceHeader* /*header*/) { + return Status::OK(); +} + +Status BlockCacheHumanReadableTraceReader::ReadAccess( + BlockCacheTraceRecord* record) { + std::string line; + if (!std::getline(human_readable_trace_reader_, line)) { + return Status::Incomplete("No more records to read."); + } + std::stringstream ss(line); + std::vector<std::string> record_strs; + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + record_strs.push_back(substr); + } + if (record_strs.size() != 21) { + return Status::Incomplete("Records format is wrong."); + } + + record->access_timestamp = ParseUint64(record_strs[0]); + uint64_t block_key = ParseUint64(record_strs[1]); + record->block_type = static_cast<TraceType>(ParseUint64(record_strs[2])); + record->block_size = ParseUint64(record_strs[3]); + record->cf_id = ParseUint64(record_strs[4]); + record->cf_name = record_strs[5]; + record->level = static_cast<uint32_t>(ParseUint64(record_strs[6])); + record->sst_fd_number = ParseUint64(record_strs[7]); + record->caller = static_cast<TableReaderCaller>(ParseUint64(record_strs[8])); + record->no_insert = static_cast<char>(ParseUint64(record_strs[9])); + record->get_id = ParseUint64(record_strs[10]); + uint64_t get_key_id = ParseUint64(record_strs[11]); + + record->referenced_data_size = ParseUint64(record_strs[12]); + record->is_cache_hit = static_cast<char>(ParseUint64(record_strs[13])); + record->referenced_key_exist_in_block = + static_cast<char>(ParseUint64(record_strs[14])); + record->num_keys_in_block = ParseUint64(record_strs[15]); + uint64_t table_id = ParseUint64(record_strs[16]); + if (table_id > 0) { + // Decrement since valid table id in the trace file equals traced table id + // + 1. + table_id -= 1; + } + uint64_t get_sequence_number = ParseUint64(record_strs[17]); + if (get_sequence_number > 0) { + record->get_from_user_specified_snapshot = true; + // Decrement since valid seq number in the trace file equals traced seq + // number + 1. + get_sequence_number -= 1; + } + uint64_t block_key_size = ParseUint64(record_strs[18]); + uint64_t get_key_size = ParseUint64(record_strs[19]); + uint64_t block_offset = ParseUint64(record_strs[20]); + + std::string tmp_block_key; + PutVarint64(&tmp_block_key, block_key); + PutVarint64(&tmp_block_key, block_offset); + // Append 1 until the size is the same as traced block key size. + while (record->block_key.size() < block_key_size - tmp_block_key.size()) { + record->block_key += "1"; + } + record->block_key += tmp_block_key; + + if (get_key_id != 0) { + std::string tmp_get_key; + PutFixed64(&tmp_get_key, get_key_id); + PutFixed64(&tmp_get_key, get_sequence_number << 8); + PutFixed32(&record->referenced_key, static_cast<uint32_t>(table_id)); + // Append 1 until the size is the same as traced key size. + while (record->referenced_key.size() < get_key_size - tmp_get_key.size()) { + record->referenced_key += "1"; + } + record->referenced_key += tmp_get_key; + } + return Status::OK(); +} + +BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); } + +BlockCacheTracer::~BlockCacheTracer() { EndTrace(); } + +Status BlockCacheTracer::StartTrace( + const BlockCacheTraceOptions& trace_options, + std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (writer_.load()) { + return Status::Busy(); + } + get_id_counter_.store(1); + trace_options_ = trace_options; + writer_.store(trace_writer.release()); + return writer_.load()->WriteHeader(); +} + +void BlockCacheTracer::EndTrace() { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return; + } + delete writer_.load(); + writer_.store(nullptr); +} + +Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record, + const Slice& block_key, + const Slice& cf_name, + const Slice& referenced_key) { + if (!writer_.load() || !ShouldTrace(block_key, trace_options_)) { + return Status::OK(); + } + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return Status::OK(); + } + return writer_.load()->WriteBlockAccess(record, block_key, cf_name, + referenced_key); +} + +uint64_t BlockCacheTracer::NextGetId() { + if (!writer_.load(std::memory_order_relaxed)) { + return BlockCacheTraceHelper::kReservedGetId; + } + uint64_t prev_value = get_id_counter_.fetch_add(1); + if (prev_value == BlockCacheTraceHelper::kReservedGetId) { + // fetch and add again. + return get_id_counter_.fetch_add(1); + } + return prev_value; +} + +std::unique_ptr<BlockCacheTraceWriter> NewBlockCacheTraceWriter( + SystemClock* clock, const BlockCacheTraceWriterOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) { + return std::unique_ptr<BlockCacheTraceWriter>(new BlockCacheTraceWriterImpl( + clock, trace_options, std::move(trace_writer))); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/block_cache_tracer.h b/src/rocksdb/trace_replay/block_cache_tracer.h new file mode 100644 index 000000000..4a749608f --- /dev/null +++ b/src/rocksdb/trace_replay/block_cache_tracer.h @@ -0,0 +1,239 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <fstream> + +#include "monitoring/instrumented_mutex.h" +#include "rocksdb/block_cache_trace_writer.h" +#include "rocksdb/options.h" +#include "rocksdb/table_reader_caller.h" +#include "rocksdb/trace_reader_writer.h" +#include "trace_replay/trace_replay.h" + +namespace ROCKSDB_NAMESPACE { +class Env; +class SystemClock; + +extern const uint64_t kMicrosInSecond; +extern const uint64_t kSecondInMinute; +extern const uint64_t kSecondInHour; + +struct BlockCacheTraceRecord; + +class BlockCacheTraceHelper { + public: + static bool IsGetOrMultiGetOnDataBlock(TraceType block_type, + TableReaderCaller caller); + static bool IsGetOrMultiGet(TableReaderCaller caller); + static bool IsUserAccess(TableReaderCaller caller); + // Row key is a concatenation of the access's fd_number and the referenced + // user key. + static std::string ComputeRowKey(const BlockCacheTraceRecord& access); + // The first four bytes of the referenced key in a Get request is the table + // id. + static uint64_t GetTableId(const BlockCacheTraceRecord& access); + // The sequence number of a get request is the last part of the referenced + // key. + static uint64_t GetSequenceNumber(const BlockCacheTraceRecord& access); + // Block offset in a file is the last varint64 in the block key. + static uint64_t GetBlockOffsetInFile(const BlockCacheTraceRecord& access); + + static const std::string kUnknownColumnFamilyName; + static const uint64_t kReservedGetId; +}; + +// Lookup context for tracing block cache accesses. +// We trace block accesses at five places: +// 1. BlockBasedTable::GetFilter +// 2. BlockBasedTable::GetUncompressedDict. +// 3. BlockBasedTable::MaybeReadAndLoadToCache. (To trace access on data, index, +// and range deletion block.) +// 4. BlockBasedTable::Get. (To trace the referenced key and whether the +// referenced key exists in a fetched data block.) +// 5. BlockBasedTable::MultiGet. (To trace the referenced key and whether the +// referenced key exists in a fetched data block.) +// The context is created at: +// 1. BlockBasedTable::Get. (kUserGet) +// 2. BlockBasedTable::MultiGet. (kUserMGet) +// 3. BlockBasedTable::NewIterator. (either kUserIterator, kCompaction, or +// external SST ingestion calls this function.) +// 4. BlockBasedTable::Open. (kPrefetch) +// 5. Index/Filter::CacheDependencies. (kPrefetch) +// 6. BlockBasedTable::ApproximateOffsetOf. (kCompaction or +// kUserApproximateSize). +struct BlockCacheLookupContext { + BlockCacheLookupContext(const TableReaderCaller& _caller) : caller(_caller) {} + BlockCacheLookupContext(const TableReaderCaller& _caller, uint64_t _get_id, + bool _get_from_user_specified_snapshot) + : caller(_caller), + get_id(_get_id), + get_from_user_specified_snapshot(_get_from_user_specified_snapshot) {} + const TableReaderCaller caller; + // These are populated when we perform lookup/insert on block cache. The block + // cache tracer uses these inforation when logging the block access at + // BlockBasedTable::GET and BlockBasedTable::MultiGet. + bool is_cache_hit = false; + bool no_insert = false; + TraceType block_type = TraceType::kTraceMax; + uint64_t block_size = 0; + std::string block_key; + uint64_t num_keys_in_block = 0; + // The unique id associated with Get and MultiGet. This enables us to track + // how many blocks a Get/MultiGet request accesses. We can also measure the + // impact of row cache vs block cache. + uint64_t get_id = 0; + std::string referenced_key; + bool get_from_user_specified_snapshot = false; + + void FillLookupContext(bool _is_cache_hit, bool _no_insert, + TraceType _block_type, uint64_t _block_size, + const std::string& _block_key, + uint64_t _num_keys_in_block) { + is_cache_hit = _is_cache_hit; + no_insert = _no_insert; + block_type = _block_type; + block_size = _block_size; + block_key = _block_key; + num_keys_in_block = _num_keys_in_block; + } +}; + +struct BlockCacheTraceHeader { + uint64_t start_time; + uint32_t rocksdb_major_version; + uint32_t rocksdb_minor_version; +}; + +// BlockCacheTraceWriter captures all RocksDB block cache accesses using a +// user-provided TraceWriter. Every RocksDB operation is written as a single +// trace. Each trace will have a timestamp and type, followed by the trace +// payload. +class BlockCacheTraceWriterImpl : public BlockCacheTraceWriter { + public: + BlockCacheTraceWriterImpl(SystemClock* clock, + const BlockCacheTraceWriterOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer); + ~BlockCacheTraceWriterImpl() = default; + // No copy and move. + BlockCacheTraceWriterImpl(const BlockCacheTraceWriterImpl&) = delete; + BlockCacheTraceWriterImpl& operator=(const BlockCacheTraceWriterImpl&) = + delete; + BlockCacheTraceWriterImpl(BlockCacheTraceWriterImpl&&) = delete; + BlockCacheTraceWriterImpl& operator=(BlockCacheTraceWriterImpl&&) = delete; + + // Pass Slice references to avoid copy. + Status WriteBlockAccess(const BlockCacheTraceRecord& record, + const Slice& block_key, const Slice& cf_name, + const Slice& referenced_key); + + // Write a trace header at the beginning, typically on initiating a trace, + // with some metadata like a magic number and RocksDB version. + Status WriteHeader(); + + private: + SystemClock* clock_; + BlockCacheTraceWriterOptions trace_options_; + std::unique_ptr<TraceWriter> trace_writer_; +}; + +// Write a trace record in human readable format, see +// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format +// for details. +class BlockCacheHumanReadableTraceWriter { + public: + ~BlockCacheHumanReadableTraceWriter(); + + Status NewWritableFile(const std::string& human_readable_trace_file_path, + ROCKSDB_NAMESPACE::Env* env); + + Status WriteHumanReadableTraceRecord(const BlockCacheTraceRecord& access, + uint64_t block_id, uint64_t get_key_id); + + private: + char trace_record_buffer_[1024 * 1024]; + std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> + human_readable_trace_file_writer_; +}; + +// BlockCacheTraceReader helps read the trace file generated by +// BlockCacheTraceWriter using a user provided TraceReader. +class BlockCacheTraceReader { + public: + BlockCacheTraceReader(std::unique_ptr<TraceReader>&& reader); + virtual ~BlockCacheTraceReader() = default; + // No copy and move. + BlockCacheTraceReader(const BlockCacheTraceReader&) = delete; + BlockCacheTraceReader& operator=(const BlockCacheTraceReader&) = delete; + BlockCacheTraceReader(BlockCacheTraceReader&&) = delete; + BlockCacheTraceReader& operator=(BlockCacheTraceReader&&) = delete; + + Status ReadHeader(BlockCacheTraceHeader* header); + + Status ReadAccess(BlockCacheTraceRecord* record); + + private: + std::unique_ptr<TraceReader> trace_reader_; +}; + +// Read a trace record in human readable format, see +// https://github.com/facebook/rocksdb/wiki/Block-cache-analysis-and-simulation-tools#trace-format +// for detailed. +class BlockCacheHumanReadableTraceReader : public BlockCacheTraceReader { + public: + BlockCacheHumanReadableTraceReader(const std::string& trace_file_path); + + ~BlockCacheHumanReadableTraceReader(); + + Status ReadHeader(BlockCacheTraceHeader* header); + + Status ReadAccess(BlockCacheTraceRecord* record); + + private: + std::ifstream human_readable_trace_reader_; +}; + +// A block cache tracer. It downsamples the accesses according to +// trace_options and uses BlockCacheTraceWriter to write the access record to +// the trace file. +class BlockCacheTracer { + public: + BlockCacheTracer(); + ~BlockCacheTracer(); + // No copy and move. + BlockCacheTracer(const BlockCacheTracer&) = delete; + BlockCacheTracer& operator=(const BlockCacheTracer&) = delete; + BlockCacheTracer(BlockCacheTracer&&) = delete; + BlockCacheTracer& operator=(BlockCacheTracer&&) = delete; + + // Start writing block cache accesses to the trace_writer. + Status StartTrace(const BlockCacheTraceOptions& trace_options, + std::unique_ptr<BlockCacheTraceWriter>&& trace_writer); + + // Stop writing block cache accesses to the trace_writer. + void EndTrace(); + + bool is_tracing_enabled() const { + return writer_.load(std::memory_order_relaxed); + } + + Status WriteBlockAccess(const BlockCacheTraceRecord& record, + const Slice& block_key, const Slice& cf_name, + const Slice& referenced_key); + + // GetId cycles from 1 to std::numeric_limits<uint64_t>::max(). + uint64_t NextGetId(); + + private: + BlockCacheTraceOptions trace_options_; + // A mutex protects the writer_. + InstrumentedMutex trace_writer_mutex_; + std::atomic<BlockCacheTraceWriter*> writer_; + std::atomic<uint64_t> get_id_counter_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/block_cache_tracer_test.cc b/src/rocksdb/trace_replay/block_cache_tracer_test.cc new file mode 100644 index 000000000..f9d0773bf --- /dev/null +++ b/src/rocksdb/trace_replay/block_cache_tracer_test.cc @@ -0,0 +1,421 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/block_cache_tracer.h" + +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const uint64_t kBlockSize = 1024; +const std::string kBlockKeyPrefix = "test-block-"; +const uint32_t kCFId = 0; +const uint32_t kLevel = 1; +const uint64_t kSSTFDNumber = 100; +const std::string kRefKeyPrefix = "test-get-"; +const uint64_t kNumKeysInBlock = 1024; +const uint64_t kReferencedDataSize = 10; +} // namespace + +class BlockCacheTracerTest : public testing::Test { + public: + BlockCacheTracerTest() { + test_path_ = test::PerThreadDBPath("block_cache_tracer_test"); + env_ = ROCKSDB_NAMESPACE::Env::Default(); + clock_ = env_->GetSystemClock().get(); + EXPECT_OK(env_->CreateDir(test_path_)); + trace_file_path_ = test_path_ + "/block_cache_trace"; + } + + ~BlockCacheTracerTest() override { + EXPECT_OK(env_->DeleteFile(trace_file_path_)); + EXPECT_OK(env_->DeleteDir(test_path_)); + } + + TableReaderCaller GetCaller(uint32_t key_id) { + uint32_t n = key_id % 5; + switch (n) { + case 0: + return TableReaderCaller::kPrefetch; + case 1: + return TableReaderCaller::kCompaction; + case 2: + return TableReaderCaller::kUserGet; + case 3: + return TableReaderCaller::kUserMultiGet; + case 4: + return TableReaderCaller::kUserIterator; + } + assert(false); + return TableReaderCaller::kMaxBlockCacheLookupCaller; + } + + void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id, + TraceType block_type, uint32_t nblocks) { + assert(writer); + for (uint32_t i = 0; i < nblocks; i++) { + uint32_t key_id = from_key_id + i; + BlockCacheTraceRecord record; + record.block_type = block_type; + record.block_size = kBlockSize + key_id; + record.block_key = (kBlockKeyPrefix + std::to_string(key_id)); + record.access_timestamp = clock_->NowMicros(); + record.cf_id = kCFId; + record.cf_name = kDefaultColumnFamilyName; + record.caller = GetCaller(key_id); + record.level = kLevel; + record.sst_fd_number = kSSTFDNumber + key_id; + record.is_cache_hit = false; + record.no_insert = false; + // Provide get_id for all callers. The writer should only write get_id + // when the caller is either GET or MGET. + record.get_id = key_id + 1; + record.get_from_user_specified_snapshot = true; + // Provide these fields for all block types. + // The writer should only write these fields for data blocks and the + // caller is either GET or MGET. + record.referenced_key = (kRefKeyPrefix + std::to_string(key_id)); + record.referenced_key_exist_in_block = true; + record.num_keys_in_block = kNumKeysInBlock; + record.referenced_data_size = kReferencedDataSize + key_id; + ASSERT_OK(writer->WriteBlockAccess( + record, record.block_key, record.cf_name, record.referenced_key)); + } + } + + BlockCacheTraceRecord GenerateAccessRecord() { + uint32_t key_id = 0; + BlockCacheTraceRecord record; + record.block_type = TraceType::kBlockTraceDataBlock; + record.block_size = kBlockSize; + record.block_key = kBlockKeyPrefix + std::to_string(key_id); + record.access_timestamp = clock_->NowMicros(); + record.cf_id = kCFId; + record.cf_name = kDefaultColumnFamilyName; + record.caller = GetCaller(key_id); + record.level = kLevel; + record.sst_fd_number = kSSTFDNumber + key_id; + record.is_cache_hit = false; + record.no_insert = false; + record.referenced_key = kRefKeyPrefix + std::to_string(key_id); + record.referenced_key_exist_in_block = true; + record.num_keys_in_block = kNumKeysInBlock; + return record; + } + + void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id, + TraceType block_type, uint32_t nblocks) { + assert(reader); + for (uint32_t i = 0; i < nblocks; i++) { + uint32_t key_id = from_key_id + i; + BlockCacheTraceRecord record; + ASSERT_OK(reader->ReadAccess(&record)); + ASSERT_EQ(block_type, record.block_type); + ASSERT_EQ(kBlockSize + key_id, record.block_size); + ASSERT_EQ(kBlockKeyPrefix + std::to_string(key_id), record.block_key); + ASSERT_EQ(kCFId, record.cf_id); + ASSERT_EQ(kDefaultColumnFamilyName, record.cf_name); + ASSERT_EQ(GetCaller(key_id), record.caller); + ASSERT_EQ(kLevel, record.level); + ASSERT_EQ(kSSTFDNumber + key_id, record.sst_fd_number); + ASSERT_FALSE(record.is_cache_hit); + ASSERT_FALSE(record.no_insert); + if (record.caller == TableReaderCaller::kUserGet || + record.caller == TableReaderCaller::kUserMultiGet) { + ASSERT_EQ(key_id + 1, record.get_id); + ASSERT_TRUE(record.get_from_user_specified_snapshot); + ASSERT_EQ(kRefKeyPrefix + std::to_string(key_id), + record.referenced_key); + } else { + ASSERT_EQ(BlockCacheTraceHelper::kReservedGetId, record.get_id); + ASSERT_FALSE(record.get_from_user_specified_snapshot); + ASSERT_EQ("", record.referenced_key); + } + if (block_type == TraceType::kBlockTraceDataBlock && + (record.caller == TableReaderCaller::kUserGet || + record.caller == TableReaderCaller::kUserMultiGet)) { + ASSERT_TRUE(record.referenced_key_exist_in_block); + ASSERT_EQ(kNumKeysInBlock, record.num_keys_in_block); + ASSERT_EQ(kReferencedDataSize + key_id, record.referenced_data_size); + continue; + } + ASSERT_FALSE(record.referenced_key_exist_in_block); + ASSERT_EQ(0, record.num_keys_in_block); + ASSERT_EQ(0, record.referenced_data_size); + } + } + + Env* env_; + SystemClock* clock_; + EnvOptions env_options_; + std::string trace_file_path_; + std::string test_path_; +}; + +TEST_F(BlockCacheTracerTest, AtomicWriteBeforeStartTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTracer writer; + // The record should be written to the trace_file since StartTrace is not + // called. + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains nothing. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_NOK(reader.ReadHeader(&header)); + } +} + +TEST_F(BlockCacheTracerTest, AtomicWrite) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + BlockCacheTraceWriterOptions trace_writer_opt; + BlockCacheTraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer = + NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt, + std::move(trace_writer)); + ASSERT_NE(block_cache_trace_writer, nullptr); + BlockCacheTracer writer; + ASSERT_OK( + writer.StartTrace(trace_opt, std::move(block_cache_trace_writer))); + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version)); + VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + +TEST_F(BlockCacheTracerTest, ConsecutiveStartTrace) { + BlockCacheTraceWriterOptions trace_writer_opt; + BlockCacheTraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK( + NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); + std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer = + NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt, + std::move(trace_writer)); + ASSERT_NE(block_cache_trace_writer, nullptr); + BlockCacheTracer writer; + ASSERT_OK(writer.StartTrace(trace_opt, std::move(block_cache_trace_writer))); + ASSERT_NOK(writer.StartTrace(trace_opt, std::move(block_cache_trace_writer))); + ASSERT_OK(env_->FileExists(trace_file_path_)); +} + +TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + { + BlockCacheTraceWriterOptions trace_writer_opt; + BlockCacheTraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer = + NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt, + std::move(trace_writer)); + ASSERT_NE(block_cache_trace_writer, nullptr); + BlockCacheTracer writer; + ASSERT_OK( + writer.StartTrace(trace_opt, std::move(block_cache_trace_writer))); + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + writer.EndTrace(); + // Write the record again. This time the record should not be written since + // EndTrace is called. + ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, + record.referenced_key)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version)); + VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + +TEST_F(BlockCacheTracerTest, NextGetId) { + BlockCacheTracer writer; + { + BlockCacheTraceWriterOptions trace_writer_opt; + BlockCacheTraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer = + NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt, + std::move(trace_writer)); + ASSERT_NE(block_cache_trace_writer, nullptr); + // next get id should always return 0 before we call StartTrace. + ASSERT_EQ(0, writer.NextGetId()); + ASSERT_EQ(0, writer.NextGetId()); + ASSERT_OK( + writer.StartTrace(trace_opt, std::move(block_cache_trace_writer))); + ASSERT_EQ(1, writer.NextGetId()); + ASSERT_EQ(2, writer.NextGetId()); + writer.EndTrace(); + // next get id should return 0. + ASSERT_EQ(0, writer.NextGetId()); + } + + // Start trace again and next get id should return 1. + { + BlockCacheTraceWriterOptions trace_writer_opt; + BlockCacheTraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer = + NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt, + std::move(trace_writer)); + ASSERT_NE(block_cache_trace_writer, nullptr); + ASSERT_OK( + writer.StartTrace(trace_opt, std::move(block_cache_trace_writer))); + ASSERT_EQ(1, writer.NextGetId()); + } +} + +TEST_F(BlockCacheTracerTest, MixedBlocks) { + { + // Generate a trace file containing a mix of blocks. + BlockCacheTraceWriterOptions trace_writer_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer = + NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt, + std::move(trace_writer)); + ASSERT_NE(block_cache_trace_writer, nullptr); + ASSERT_OK(block_cache_trace_writer->WriteHeader()); + // Write blocks of different types. + WriteBlockAccess(block_cache_trace_writer.get(), 0, + TraceType::kBlockTraceUncompressionDictBlock, 10); + WriteBlockAccess(block_cache_trace_writer.get(), 10, + TraceType::kBlockTraceDataBlock, 10); + WriteBlockAccess(block_cache_trace_writer.get(), 20, + TraceType::kBlockTraceFilterBlock, 10); + WriteBlockAccess(block_cache_trace_writer.get(), 30, + TraceType::kBlockTraceIndexBlock, 10); + WriteBlockAccess(block_cache_trace_writer.get(), 40, + TraceType::kBlockTraceRangeDeletionBlock, 10); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + + { + // Verify trace file is generated correctly. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version)); + // Read blocks. + VerifyAccess(&reader, 0, TraceType::kBlockTraceUncompressionDictBlock, 10); + VerifyAccess(&reader, 10, TraceType::kBlockTraceDataBlock, 10); + VerifyAccess(&reader, 20, TraceType::kBlockTraceFilterBlock, 10); + VerifyAccess(&reader, 30, TraceType::kBlockTraceIndexBlock, 10); + VerifyAccess(&reader, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); + // Read one more record should report an error. + BlockCacheTraceRecord record; + ASSERT_NOK(reader.ReadAccess(&record)); + } +} + +TEST_F(BlockCacheTracerTest, HumanReadableTrace) { + BlockCacheTraceRecord record = GenerateAccessRecord(); + record.get_id = 1; + record.referenced_key = ""; + record.caller = TableReaderCaller::kUserGet; + record.get_from_user_specified_snapshot = true; + record.referenced_data_size = kReferencedDataSize; + PutFixed32(&record.referenced_key, 111); + PutLengthPrefixedSlice(&record.referenced_key, "get_key"); + PutFixed64(&record.referenced_key, 2 << 8); + PutLengthPrefixedSlice(&record.block_key, "block_key"); + PutVarint64(&record.block_key, 333); + { + // Generate a human readable trace file. + BlockCacheHumanReadableTraceWriter writer; + ASSERT_OK(writer.NewWritableFile(trace_file_path_, env_)); + ASSERT_OK(writer.WriteHumanReadableTraceRecord(record, 1, 1)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + BlockCacheHumanReadableTraceReader reader(trace_file_path_); + BlockCacheTraceHeader header; + BlockCacheTraceRecord read_record; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_OK(reader.ReadAccess(&read_record)); + ASSERT_EQ(TraceType::kBlockTraceDataBlock, read_record.block_type); + ASSERT_EQ(kBlockSize, read_record.block_size); + ASSERT_EQ(kCFId, read_record.cf_id); + ASSERT_EQ(kDefaultColumnFamilyName, read_record.cf_name); + ASSERT_EQ(TableReaderCaller::kUserGet, read_record.caller); + ASSERT_EQ(kLevel, read_record.level); + ASSERT_EQ(kSSTFDNumber, read_record.sst_fd_number); + ASSERT_FALSE(read_record.is_cache_hit); + ASSERT_FALSE(read_record.no_insert); + ASSERT_EQ(1, read_record.get_id); + ASSERT_TRUE(read_record.get_from_user_specified_snapshot); + ASSERT_TRUE(read_record.referenced_key_exist_in_block); + ASSERT_EQ(kNumKeysInBlock, read_record.num_keys_in_block); + ASSERT_EQ(kReferencedDataSize, read_record.referenced_data_size); + ASSERT_EQ(record.block_key.size(), read_record.block_key.size()); + ASSERT_EQ(record.referenced_key.size(), record.referenced_key.size()); + ASSERT_EQ(112, BlockCacheTraceHelper::GetTableId(read_record)); + ASSERT_EQ(3, BlockCacheTraceHelper::GetSequenceNumber(read_record)); + ASSERT_EQ(333, BlockCacheTraceHelper::GetBlockOffsetInFile(read_record)); + // Read again should fail. + ASSERT_NOK(reader.ReadAccess(&read_record)); + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/trace_replay/io_tracer.cc b/src/rocksdb/trace_replay/io_tracer.cc new file mode 100644 index 000000000..a860130f8 --- /dev/null +++ b/src/rocksdb/trace_replay/io_tracer.cc @@ -0,0 +1,303 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/io_tracer.h" + +#include <cinttypes> +#include <cstdio> +#include <cstdlib> + +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "rocksdb/slice.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/trace_reader_writer.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +IOTraceWriter::IOTraceWriter(SystemClock* clock, + const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) + : clock_(clock), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)) {} + +Status IOTraceWriter::WriteIOOp(const IOTraceRecord& record, + IODebugContext* dbg) { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + if (trace_file_size > trace_options_.max_trace_file_size) { + return Status::OK(); + } + Trace trace; + trace.ts = record.access_timestamp; + trace.type = record.trace_type; + PutFixed64(&trace.payload, record.io_op_data); + Slice file_operation(record.file_operation); + PutLengthPrefixedSlice(&trace.payload, file_operation); + PutFixed64(&trace.payload, record.latency); + Slice io_status(record.io_status); + PutLengthPrefixedSlice(&trace.payload, io_status); + Slice file_name(record.file_name); + PutLengthPrefixedSlice(&trace.payload, file_name); + + // Each bit in io_op_data stores which corresponding info from IOTraceOp will + // be added in the trace. Foreg, if bit at position 1 is set then + // IOTraceOp::kIOLen (length) will be logged in the record (Since + // IOTraceOp::kIOLen = 1 in the enum). So find all the set positions in + // io_op_data one by one and, update corresponsing info in the trace record, + // unset that bit to find other set bits until io_op_data = 0. + /* Write remaining options based on io_op_data set by file operation */ + int64_t io_op_data = static_cast<int64_t>(record.io_op_data); + while (io_op_data) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast<uint32_t>(log2(io_op_data & -io_op_data)); + switch (set_pos) { + case IOTraceOp::kIOFileSize: + PutFixed64(&trace.payload, record.file_size); + break; + case IOTraceOp::kIOLen: + PutFixed64(&trace.payload, record.len); + break; + case IOTraceOp::kIOOffset: + PutFixed64(&trace.payload, record.offset); + break; + default: + assert(false); + } + // unset the rightmost bit. + io_op_data &= (io_op_data - 1); + } + + int64_t trace_data = 0; + if (dbg) { + trace_data = static_cast<int64_t>(dbg->trace_data); + } + PutFixed64(&trace.payload, trace_data); + while (trace_data) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast<uint32_t>(log2(trace_data & -trace_data)); + switch (set_pos) { + case IODebugContext::TraceData::kRequestID: { + Slice request_id(dbg->request_id); + PutLengthPrefixedSlice(&trace.payload, request_id); + } break; + default: + assert(false); + } + // unset the rightmost bit. + trace_data &= (trace_data - 1); + } + + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +Status IOTraceWriter::WriteHeader() { + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = TraceType::kTraceBegin; + PutLengthPrefixedSlice(&trace.payload, kTraceMagic); + PutFixed32(&trace.payload, kMajorVersion); + PutFixed32(&trace.payload, kMinorVersion); + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +IOTraceReader::IOTraceReader(std::unique_ptr<TraceReader>&& reader) + : trace_reader_(std::move(reader)) {} + +Status IOTraceReader::ReadHeader(IOTraceHeader* header) { + assert(header != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + header->start_time = trace.ts; + Slice enc_slice = Slice(trace.payload); + Slice magic_number; + if (!GetLengthPrefixedSlice(&enc_slice, &magic_number)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read the magic number."); + } + if (magic_number.ToString() != kTraceMagic) { + return Status::Corruption( + "Corrupted header in the trace file: Magic number does not match."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb major " + "version number."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb minor " + "version number."); + } + // We should have retrieved all information in the header. + if (!enc_slice.empty()) { + return Status::Corruption( + "Corrupted header in the trace file: The length of header is too " + "long."); + } + return Status::OK(); +} + +Status IOTraceReader::ReadIOOp(IOTraceRecord* record) { + assert(record); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + record->access_timestamp = trace.ts; + record->trace_type = trace.type; + Slice enc_slice = Slice(trace.payload); + + if (!GetFixed64(&enc_slice, &record->io_op_data)) { + return Status::Incomplete( + "Incomplete access record: Failed to read trace data."); + } + Slice file_operation; + if (!GetLengthPrefixedSlice(&enc_slice, &file_operation)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file operation."); + } + record->file_operation = file_operation.ToString(); + if (!GetFixed64(&enc_slice, &record->latency)) { + return Status::Incomplete( + "Incomplete access record: Failed to read latency."); + } + Slice io_status; + if (!GetLengthPrefixedSlice(&enc_slice, &io_status)) { + return Status::Incomplete( + "Incomplete access record: Failed to read IO status."); + } + record->io_status = io_status.ToString(); + Slice file_name; + if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file name."); + } + record->file_name = file_name.ToString(); + + // Each bit in io_op_data stores which corresponding info from IOTraceOp will + // be added in the trace. Foreg, if bit at position 1 is set then + // IOTraceOp::kIOLen (length) will be logged in the record (Since + // IOTraceOp::kIOLen = 1 in the enum). So find all the set positions in + // io_op_data one by one and, update corresponsing info in the trace record, + // unset that bit to find other set bits until io_op_data = 0. + /* Read remaining options based on io_op_data set by file operation */ + // Assuming 63 bits will be used at max. + int64_t io_op_data = static_cast<int64_t>(record->io_op_data); + while (io_op_data) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast<uint32_t>(log2(io_op_data & -io_op_data)); + switch (set_pos) { + case IOTraceOp::kIOFileSize: + if (!GetFixed64(&enc_slice, &record->file_size)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file size."); + } + break; + case IOTraceOp::kIOLen: + if (!GetFixed64(&enc_slice, &record->len)) { + return Status::Incomplete( + "Incomplete access record: Failed to read length."); + } + break; + case IOTraceOp::kIOOffset: + if (!GetFixed64(&enc_slice, &record->offset)) { + return Status::Incomplete( + "Incomplete access record: Failed to read offset."); + } + break; + default: + assert(false); + } + // unset the rightmost bit. + io_op_data &= (io_op_data - 1); + } + + if (!GetFixed64(&enc_slice, &record->trace_data)) { + return Status::Incomplete( + "Incomplete access record: Failed to read trace op."); + } + int64_t trace_data = static_cast<int64_t>(record->trace_data); + while (trace_data) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast<uint32_t>(log2(trace_data & -trace_data)); + switch (set_pos) { + case IODebugContext::TraceData::kRequestID: { + Slice request_id; + if (!GetLengthPrefixedSlice(&enc_slice, &request_id)) { + return Status::Incomplete( + "Incomplete access record: Failed to request id."); + } + record->request_id = request_id.ToString(); + } break; + default: + assert(false); + } + // unset the rightmost bit. + trace_data &= (trace_data - 1); + } + + return Status::OK(); +} + +IOTracer::IOTracer() : tracing_enabled(false) { writer_.store(nullptr); } + +IOTracer::~IOTracer() { EndIOTrace(); } + +Status IOTracer::StartIOTrace(SystemClock* clock, + const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (writer_.load()) { + return Status::Busy(); + } + trace_options_ = trace_options; + writer_.store( + new IOTraceWriter(clock, trace_options, std::move(trace_writer))); + tracing_enabled = true; + return writer_.load()->WriteHeader(); +} + +void IOTracer::EndIOTrace() { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return; + } + delete writer_.load(); + writer_.store(nullptr); + tracing_enabled = false; +} + +void IOTracer::WriteIOOp(const IOTraceRecord& record, IODebugContext* dbg) { + if (!writer_.load()) { + return; + } + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return; + } + writer_.load()->WriteIOOp(record, dbg).PermitUncheckedError(); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/io_tracer.h b/src/rocksdb/trace_replay/io_tracer.h new file mode 100644 index 000000000..3fc7cdba0 --- /dev/null +++ b/src/rocksdb/trace_replay/io_tracer.h @@ -0,0 +1,185 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <fstream> + +#include "monitoring/instrumented_mutex.h" +#include "port/lang.h" +#include "rocksdb/file_system.h" +#include "rocksdb/options.h" +#include "rocksdb/trace_record.h" +#include "trace_replay/trace_replay.h" + +namespace ROCKSDB_NAMESPACE { +class SystemClock; +class TraceReader; +class TraceWriter; + +/* In order to log new data in trace record for specified operations, do + following: + 1. Add new data in IOTraceOP (say kIONewData= 3) + 2. Log it in IOTraceWriter::WriteIOOp, and read that in + IOTraceReader::ReadIOOp and + IOTraceRecordParser::PrintHumanReadableIOTraceRecord in the switch case. + 3. In the FileSystemTracer APIs where this data will be logged with, update + io_op_data |= (1 << IOTraceOp::kIONewData). +*/ +enum IOTraceOp : char { + // The value of each enum represents the bitwise position for + // IOTraceRecord.io_op_data. + kIOFileSize = 0, + kIOLen = 1, + kIOOffset = 2, +}; + +struct IOTraceRecord { + // Required fields for all accesses. + uint64_t access_timestamp = 0; + TraceType trace_type = TraceType::kTraceMax; + // Each bit in io_op_data stores which corresponding info from IOTraceOp will + // be added in the trace. Foreg, if bit at position 1 is set then + // IOTraceOp::kIOLen (length) will be logged in the record. + uint64_t io_op_data = 0; + std::string file_operation; + uint64_t latency = 0; + std::string io_status; + // Stores file name instead of full path. + std::string file_name; + + // Fields added to record based on IO operation. + uint64_t len = 0; + uint64_t offset = 0; + uint64_t file_size = 0; + + // Additional information passed in IODebugContext. + uint64_t trace_data = 0; + std::string request_id; + + IOTraceRecord() {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const uint64_t& _io_op_data, const std::string& _file_operation, + const uint64_t& _latency, const std::string& _io_status, + const std::string& _file_name, const uint64_t& _file_size = 0) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + io_op_data(_io_op_data), + file_operation(_file_operation), + latency(_latency), + io_status(_io_status), + file_name(_file_name), + file_size(_file_size) {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const uint64_t& _io_op_data, const std::string& _file_operation, + const uint64_t& _latency, const std::string& _io_status, + const std::string& _file_name, const uint64_t& _len, + const uint64_t& _offset) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + io_op_data(_io_op_data), + file_operation(_file_operation), + latency(_latency), + io_status(_io_status), + file_name(_file_name), + len(_len), + offset(_offset) {} +}; + +struct IOTraceHeader { + uint64_t start_time; + uint32_t rocksdb_major_version; + uint32_t rocksdb_minor_version; +}; + +// IOTraceWriter writes IO operation as a single trace. Each trace will have a +// timestamp and type, followed by the trace payload. +class IOTraceWriter { + public: + IOTraceWriter(SystemClock* clock, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer); + ~IOTraceWriter() = default; + // No copy and move. + IOTraceWriter(const IOTraceWriter&) = delete; + IOTraceWriter& operator=(const IOTraceWriter&) = delete; + IOTraceWriter(IOTraceWriter&&) = delete; + IOTraceWriter& operator=(IOTraceWriter&&) = delete; + + Status WriteIOOp(const IOTraceRecord& record, IODebugContext* dbg); + + // Write a trace header at the beginning, typically on initiating a trace, + // with some metadata like a magic number and RocksDB version. + Status WriteHeader(); + + private: + SystemClock* clock_; + TraceOptions trace_options_; + std::unique_ptr<TraceWriter> trace_writer_; +}; + +// IOTraceReader helps read the trace file generated by IOTraceWriter. +class IOTraceReader { + public: + explicit IOTraceReader(std::unique_ptr<TraceReader>&& reader); + ~IOTraceReader() = default; + // No copy and move. + IOTraceReader(const IOTraceReader&) = delete; + IOTraceReader& operator=(const IOTraceReader&) = delete; + IOTraceReader(IOTraceReader&&) = delete; + IOTraceReader& operator=(IOTraceReader&&) = delete; + + Status ReadHeader(IOTraceHeader* header); + + Status ReadIOOp(IOTraceRecord* record); + + private: + std::unique_ptr<TraceReader> trace_reader_; +}; + +// An IO tracer. It uses IOTraceWriter to write the access record to the +// trace file. +class IOTracer { + public: + IOTracer(); + ~IOTracer(); + // No copy and move. + IOTracer(const IOTracer&) = delete; + IOTracer& operator=(const IOTracer&) = delete; + IOTracer(IOTracer&&) = delete; + IOTracer& operator=(IOTracer&&) = delete; + + // no_sanitize is added for tracing_enabled. writer_ is protected under mutex + // so even if user call Start/EndIOTrace and tracing_enabled is not updated in + // the meanwhile, WriteIOOp will anyways check the writer_ protected under + // mutex and ignore the operation if writer_is null. So its ok if + // tracing_enabled shows non updated value. + + // Start writing IO operations to the trace_writer. + TSAN_SUPPRESSION Status + StartIOTrace(SystemClock* clock, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer); + + // Stop writing IO operations to the trace_writer. + TSAN_SUPPRESSION void EndIOTrace(); + + TSAN_SUPPRESSION bool is_tracing_enabled() const { return tracing_enabled; } + + void WriteIOOp(const IOTraceRecord& record, IODebugContext* dbg); + + private: + TraceOptions trace_options_; + // A mutex protects the writer_. + InstrumentedMutex trace_writer_mutex_; + std::atomic<IOTraceWriter*> writer_; + // bool tracing_enabled is added to avoid costly operation of checking atomic + // variable 'writer_' is nullptr or not in is_tracing_enabled(). + // is_tracing_enabled() is invoked multiple times by FileSystem classes. + bool tracing_enabled; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/io_tracer_test.cc b/src/rocksdb/trace_replay/io_tracer_test.cc new file mode 100644 index 000000000..be3af4fb3 --- /dev/null +++ b/src/rocksdb/trace_replay/io_tracer_test.cc @@ -0,0 +1,353 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/io_tracer.h" + +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const std::string kDummyFile = "/dummy/file"; + +} // namespace + +class IOTracerTest : public testing::Test { + public: + IOTracerTest() { + test_path_ = test::PerThreadDBPath("io_tracer_test"); + env_ = ROCKSDB_NAMESPACE::Env::Default(); + clock_ = env_->GetSystemClock().get(); + EXPECT_OK(env_->CreateDir(test_path_)); + trace_file_path_ = test_path_ + "/io_trace"; + } + + ~IOTracerTest() override { + EXPECT_OK(env_->DeleteFile(trace_file_path_)); + EXPECT_OK(env_->DeleteDir(test_path_)); + } + + std::string GetFileOperation(uint64_t id) { + id = id % 4; + switch (id) { + case 0: + return "CreateDir"; + case 1: + return "GetChildren"; + case 2: + return "FileSize"; + case 3: + return "DeleteDir"; + default: + assert(false); + } + return ""; + } + + void WriteIOOp(IOTraceWriter* writer, uint64_t nrecords) { + assert(writer); + for (uint64_t i = 0; i < nrecords; i++) { + IOTraceRecord record; + record.io_op_data = 0; + record.trace_type = TraceType::kIOTracer; + record.io_op_data |= (1 << IOTraceOp::kIOLen); + record.io_op_data |= (1 << IOTraceOp::kIOOffset); + record.file_operation = GetFileOperation(i); + record.io_status = IOStatus::OK().ToString(); + record.file_name = kDummyFile + std::to_string(i); + record.len = i; + record.offset = i + 20; + EXPECT_OK(writer->WriteIOOp(record, nullptr)); + } + } + + void VerifyIOOp(IOTraceReader* reader, uint32_t nrecords) { + assert(reader); + for (uint32_t i = 0; i < nrecords; i++) { + IOTraceRecord record; + ASSERT_OK(reader->ReadIOOp(&record)); + ASSERT_EQ(record.file_operation, GetFileOperation(i)); + ASSERT_EQ(record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(record.len, i); + ASSERT_EQ(record.offset, i + 20); + } + } + + Env* env_; + SystemClock* clock_; + EnvOptions env_options_; + std::string trace_file_path_; + std::string test_path_; +}; + +TEST_F(IOTracerTest, MultipleRecordsWithDifferentIOOpOptions) { + std::string file_name = kDummyFile + std::to_string(5); + { + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer))); + + // Write general record. + IOTraceRecord record0(0, TraceType::kIOTracer, 0 /*io_op_data*/, + GetFileOperation(0), 155 /*latency*/, + IOStatus::OK().ToString(), file_name); + writer.WriteIOOp(record0, nullptr); + + // Write record with FileSize. + uint64_t io_op_data = 0; + io_op_data |= (1 << IOTraceOp::kIOFileSize); + IOTraceRecord record1(0, TraceType::kIOTracer, io_op_data, + GetFileOperation(1), 10 /*latency*/, + IOStatus::OK().ToString(), file_name, + 256 /*file_size*/); + writer.WriteIOOp(record1, nullptr); + + // Write record with Length. + io_op_data = 0; + io_op_data |= (1 << IOTraceOp::kIOLen); + IOTraceRecord record2(0, TraceType::kIOTracer, io_op_data, + GetFileOperation(2), 10 /*latency*/, + IOStatus::OK().ToString(), file_name, 100 /*length*/, + 200 /*offset*/); + writer.WriteIOOp(record2, nullptr); + + // Write record with Length and offset. + io_op_data = 0; + io_op_data |= (1 << IOTraceOp::kIOLen); + io_op_data |= (1 << IOTraceOp::kIOOffset); + IOTraceRecord record3(0, TraceType::kIOTracer, io_op_data, + GetFileOperation(3), 10 /*latency*/, + IOStatus::OK().ToString(), file_name, 120 /*length*/, + 17 /*offset*/); + writer.WriteIOOp(record3, nullptr); + + // Write record with offset. + io_op_data = 0; + io_op_data |= (1 << IOTraceOp::kIOOffset); + IOTraceRecord record4(0, TraceType::kIOTracer, io_op_data, + GetFileOperation(4), 10 /*latency*/, + IOStatus::OK().ToString(), file_name, 13 /*length*/, + 50 /*offset*/); + writer.WriteIOOp(record4, nullptr); + + // Write record with IODebugContext. + io_op_data = 0; + IODebugContext dbg; + dbg.SetRequestId("request_id_1"); + IOTraceRecord record5(0, TraceType::kIOTracer, io_op_data, + GetFileOperation(5), 10 /*latency*/, + IOStatus::OK().ToString(), file_name); + writer.WriteIOOp(record5, &dbg); + + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file is generated correctly. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version)); + + // Read general record. + IOTraceRecord record0; + ASSERT_OK(reader.ReadIOOp(&record0)); + ASSERT_EQ(record0.file_operation, GetFileOperation(0)); + ASSERT_EQ(record0.latency, 155); + ASSERT_EQ(record0.file_name, file_name); + + // Read record with FileSize. + IOTraceRecord record1; + ASSERT_OK(reader.ReadIOOp(&record1)); + ASSERT_EQ(record1.file_size, 256); + ASSERT_EQ(record1.len, 0); + ASSERT_EQ(record1.offset, 0); + + // Read record with Length. + IOTraceRecord record2; + ASSERT_OK(reader.ReadIOOp(&record2)); + ASSERT_EQ(record2.len, 100); + ASSERT_EQ(record2.file_size, 0); + ASSERT_EQ(record2.offset, 0); + + // Read record with Length and offset. + IOTraceRecord record3; + ASSERT_OK(reader.ReadIOOp(&record3)); + ASSERT_EQ(record3.len, 120); + ASSERT_EQ(record3.file_size, 0); + ASSERT_EQ(record3.offset, 17); + + // Read record with offset. + IOTraceRecord record4; + ASSERT_OK(reader.ReadIOOp(&record4)); + ASSERT_EQ(record4.len, 0); + ASSERT_EQ(record4.file_size, 0); + ASSERT_EQ(record4.offset, 50); + + IOTraceRecord record5; + ASSERT_OK(reader.ReadIOOp(&record5)); + ASSERT_EQ(record5.len, 0); + ASSERT_EQ(record5.file_size, 0); + ASSERT_EQ(record5.offset, 0); + ASSERT_EQ(record5.request_id, "request_id_1"); + // Read one more record and it should report error. + IOTraceRecord record6; + ASSERT_NOK(reader.ReadIOOp(&record6)); + } +} + +TEST_F(IOTracerTest, AtomicWrite) { + std::string file_name = kDummyFile + std::to_string(0); + { + IOTraceRecord record(0, TraceType::kIOTracer, 0 /*io_op_data*/, + GetFileOperation(0), 10 /*latency*/, + IOStatus::OK().ToString(), file_name); + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer))); + writer.WriteIOOp(record, nullptr); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version)); + // Read record and verify data. + IOTraceRecord access_record; + ASSERT_OK(reader.ReadIOOp(&access_record)); + ASSERT_EQ(access_record.file_operation, GetFileOperation(0)); + ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(access_record.file_name, file_name); + ASSERT_NOK(reader.ReadIOOp(&access_record)); + } +} + +TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) { + std::string file_name = kDummyFile + std::to_string(0); + { + IOTraceRecord record(0, TraceType::kIOTracer, 0 /*io_op_data*/, + GetFileOperation(0), 0, IOStatus::OK().ToString(), + file_name); + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + // The record should not be written to the trace_file since StartIOTrace is + // not called. + writer.WriteIOOp(record, nullptr); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains nothing. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_NOK(reader.ReadHeader(&header)); + } +} + +TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) { + std::string file_name = kDummyFile + std::to_string(0); + { + uint64_t io_op_data = 0; + io_op_data |= (1 << IOTraceOp::kIOFileSize); + IOTraceRecord record( + 0, TraceType::kIOTracer, io_op_data, GetFileOperation(2), 0 /*latency*/, + IOStatus::OK().ToString(), file_name, 10 /*file_size*/); + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer))); + writer.WriteIOOp(record, nullptr); + writer.EndIOTrace(); + // Write the record again. This time the record should not be written since + // EndIOTrace is called. + writer.WriteIOOp(record, nullptr); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version)); + + IOTraceRecord access_record; + ASSERT_OK(reader.ReadIOOp(&access_record)); + ASSERT_EQ(access_record.file_operation, GetFileOperation(2)); + ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(access_record.file_size, 10); + // No more record. + ASSERT_NOK(reader.ReadIOOp(&access_record)); + } +} + +TEST_F(IOTracerTest, AtomicMultipleWrites) { + { + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTraceWriter writer(clock_, trace_opt, std::move(trace_writer)); + ASSERT_OK(writer.WriteHeader()); + // Write 10 records + WriteIOOp(&writer, 10); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + + { + // Verify trace file is generated correctly. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version)); + // Read 10 records. + VerifyIOOp(&reader, 10); + // Read one more and record and it should report error. + IOTraceRecord record; + ASSERT_NOK(reader.ReadIOOp(&record)); + } +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/trace_replay/trace_record.cc b/src/rocksdb/trace_replay/trace_record.cc new file mode 100644 index 000000000..21df0275d --- /dev/null +++ b/src/rocksdb/trace_replay/trace_record.cc @@ -0,0 +1,206 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/trace_record.h" + +#include <utility> + +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_record_result.h" +#include "trace_replay/trace_record_handler.h" + +namespace ROCKSDB_NAMESPACE { + +// TraceRecord +TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {} + +uint64_t TraceRecord::GetTimestamp() const { return timestamp_; } + +TraceRecord::Handler* TraceRecord::NewExecutionHandler( + DB* db, const std::vector<ColumnFamilyHandle*>& handles) { + return new TraceExecutionHandler(db, handles); +} + +// QueryTraceRecord +QueryTraceRecord::QueryTraceRecord(uint64_t timestamp) + : TraceRecord(timestamp) {} + +// WriteQueryTraceRecord +WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, + uint64_t timestamp) + : QueryTraceRecord(timestamp), rep_(std::move(write_batch_rep)) {} + +WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep, + uint64_t timestamp) + : QueryTraceRecord(timestamp) { + rep_.PinSelf(write_batch_rep); +} + +WriteQueryTraceRecord::~WriteQueryTraceRecord() { rep_.clear(); } + +Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); } + +Status WriteQueryTraceRecord::Accept( + Handler* handler, std::unique_ptr<TraceRecordResult>* result) { + assert(handler != nullptr); + return handler->Handle(*this, result); +} + +// GetQueryTraceRecord +GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id, + PinnableSlice&& key, + uint64_t timestamp) + : QueryTraceRecord(timestamp), + cf_id_(column_family_id), + key_(std::move(key)) {} + +GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id, + const std::string& key, + uint64_t timestamp) + : QueryTraceRecord(timestamp), cf_id_(column_family_id) { + key_.PinSelf(key); +} + +GetQueryTraceRecord::~GetQueryTraceRecord() { key_.clear(); } + +uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; } + +Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); } + +Status GetQueryTraceRecord::Accept(Handler* handler, + std::unique_ptr<TraceRecordResult>* result) { + assert(handler != nullptr); + return handler->Handle(*this, result); +} + +// IteratorQueryTraceRecord +IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp) + : QueryTraceRecord(timestamp) {} + +IteratorQueryTraceRecord::IteratorQueryTraceRecord(PinnableSlice&& lower_bound, + PinnableSlice&& upper_bound, + uint64_t timestamp) + : QueryTraceRecord(timestamp), + lower_(std::move(lower_bound)), + upper_(std::move(upper_bound)) {} + +IteratorQueryTraceRecord::IteratorQueryTraceRecord( + const std::string& lower_bound, const std::string& upper_bound, + uint64_t timestamp) + : QueryTraceRecord(timestamp) { + lower_.PinSelf(lower_bound); + upper_.PinSelf(upper_bound); +} + +IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {} + +Slice IteratorQueryTraceRecord::GetLowerBound() const { return Slice(lower_); } + +Slice IteratorQueryTraceRecord::GetUpperBound() const { return Slice(upper_); } + +// IteratorSeekQueryTraceRecord +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, + uint64_t timestamp) + : IteratorQueryTraceRecord(timestamp), + type_(seek_type), + cf_id_(column_family_id), + key_(std::move(key)) {} + +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, const std::string& key, + uint64_t timestamp) + : IteratorQueryTraceRecord(timestamp), + type_(seek_type), + cf_id_(column_family_id) { + key_.PinSelf(key); +} + +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, + PinnableSlice&& lower_bound, PinnableSlice&& upper_bound, + uint64_t timestamp) + : IteratorQueryTraceRecord(std::move(lower_bound), std::move(upper_bound), + timestamp), + type_(seek_type), + cf_id_(column_family_id), + key_(std::move(key)) {} + +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, const std::string& key, + const std::string& lower_bound, const std::string& upper_bound, + uint64_t timestamp) + : IteratorQueryTraceRecord(lower_bound, upper_bound, timestamp), + type_(seek_type), + cf_id_(column_family_id) { + key_.PinSelf(key); +} + +IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); } + +TraceType IteratorSeekQueryTraceRecord::GetTraceType() const { + return static_cast<TraceType>(type_); +} + +IteratorSeekQueryTraceRecord::SeekType +IteratorSeekQueryTraceRecord::GetSeekType() const { + return type_; +} + +uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const { + return cf_id_; +} + +Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); } + +Status IteratorSeekQueryTraceRecord::Accept( + Handler* handler, std::unique_ptr<TraceRecordResult>* result) { + assert(handler != nullptr); + return handler->Handle(*this, result); +} + +// MultiGetQueryTraceRecord +MultiGetQueryTraceRecord::MultiGetQueryTraceRecord( + std::vector<uint32_t> column_family_ids, std::vector<PinnableSlice>&& keys, + uint64_t timestamp) + : QueryTraceRecord(timestamp), + cf_ids_(column_family_ids), + keys_(std::move(keys)) {} + +MultiGetQueryTraceRecord::MultiGetQueryTraceRecord( + std::vector<uint32_t> column_family_ids, + const std::vector<std::string>& keys, uint64_t timestamp) + : QueryTraceRecord(timestamp), cf_ids_(column_family_ids) { + keys_.reserve(keys.size()); + for (const std::string& key : keys) { + PinnableSlice ps; + ps.PinSelf(key); + keys_.push_back(std::move(ps)); + } +} + +MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() { + cf_ids_.clear(); + keys_.clear(); +} + +std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const { + return cf_ids_; +} + +std::vector<Slice> MultiGetQueryTraceRecord::GetKeys() const { + return std::vector<Slice>(keys_.begin(), keys_.end()); +} + +Status MultiGetQueryTraceRecord::Accept( + Handler* handler, std::unique_ptr<TraceRecordResult>* result) { + assert(handler != nullptr); + return handler->Handle(*this, result); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/trace_record_handler.cc b/src/rocksdb/trace_replay/trace_record_handler.cc new file mode 100644 index 000000000..ca179e870 --- /dev/null +++ b/src/rocksdb/trace_replay/trace_record_handler.cc @@ -0,0 +1,190 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/trace_record_handler.h" + +#include "rocksdb/iterator.h" +#include "rocksdb/trace_record_result.h" +#include "rocksdb/write_batch.h" + +namespace ROCKSDB_NAMESPACE { + +// TraceExecutionHandler +TraceExecutionHandler::TraceExecutionHandler( + DB* db, const std::vector<ColumnFamilyHandle*>& handles) + : TraceRecord::Handler(), + db_(db), + write_opts_(WriteOptions()), + read_opts_(ReadOptions()) { + assert(db != nullptr); + assert(!handles.empty()); + cf_map_.reserve(handles.size()); + for (ColumnFamilyHandle* handle : handles) { + assert(handle != nullptr); + cf_map_.insert({handle->GetID(), handle}); + } + clock_ = db_->GetEnv()->GetSystemClock().get(); +} + +TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); } + +Status TraceExecutionHandler::Handle( + const WriteQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) { + if (result != nullptr) { + result->reset(nullptr); + } + uint64_t start = clock_->NowMicros(); + + WriteBatch batch(record.GetWriteBatchRep().ToString()); + Status s = db_->Write(write_opts_, &batch); + + uint64_t end = clock_->NowMicros(); + + if (s.ok() && result != nullptr) { + result->reset(new StatusOnlyTraceExecutionResult(s, start, end, + record.GetTraceType())); + } + + return s; +} + +Status TraceExecutionHandler::Handle( + const GetQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) { + if (result != nullptr) { + result->reset(nullptr); + } + auto it = cf_map_.find(record.GetColumnFamilyID()); + if (it == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + uint64_t start = clock_->NowMicros(); + + std::string value; + Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value); + + uint64_t end = clock_->NowMicros(); + + // Treat not found as ok, return other errors. + if (!s.ok() && !s.IsNotFound()) { + return s; + } + + if (result != nullptr) { + // Report the actual opetation status in TraceExecutionResult + result->reset(new SingleValueTraceExecutionResult( + std::move(s), std::move(value), start, end, record.GetTraceType())); + } + return Status::OK(); +} + +Status TraceExecutionHandler::Handle( + const IteratorSeekQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) { + if (result != nullptr) { + result->reset(nullptr); + } + auto it = cf_map_.find(record.GetColumnFamilyID()); + if (it == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + ReadOptions r_opts = read_opts_; + Slice lower = record.GetLowerBound(); + if (!lower.empty()) { + r_opts.iterate_lower_bound = &lower; + } + Slice upper = record.GetUpperBound(); + if (!upper.empty()) { + r_opts.iterate_upper_bound = &upper; + } + Iterator* single_iter = db_->NewIterator(r_opts, it->second); + + uint64_t start = clock_->NowMicros(); + + switch (record.GetSeekType()) { + case IteratorSeekQueryTraceRecord::kSeekForPrev: { + single_iter->SeekForPrev(record.GetKey()); + break; + } + default: { + single_iter->Seek(record.GetKey()); + break; + } + } + + uint64_t end = clock_->NowMicros(); + + Status s = single_iter->status(); + if (s.ok() && result != nullptr) { + if (single_iter->Valid()) { + PinnableSlice ps_key; + ps_key.PinSelf(single_iter->key()); + PinnableSlice ps_value; + ps_value.PinSelf(single_iter->value()); + result->reset(new IteratorTraceExecutionResult( + true, s, std::move(ps_key), std::move(ps_value), start, end, + record.GetTraceType())); + } else { + result->reset(new IteratorTraceExecutionResult( + false, s, "", "", start, end, record.GetTraceType())); + } + } + delete single_iter; + + return s; +} + +Status TraceExecutionHandler::Handle( + const MultiGetQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) { + if (result != nullptr) { + result->reset(nullptr); + } + std::vector<ColumnFamilyHandle*> handles; + handles.reserve(record.GetColumnFamilyIDs().size()); + for (uint32_t cf_id : record.GetColumnFamilyIDs()) { + auto it = cf_map_.find(cf_id); + if (it == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + handles.push_back(it->second); + } + + std::vector<Slice> keys = record.GetKeys(); + + if (handles.empty() || keys.empty()) { + return Status::InvalidArgument("Empty MultiGet cf_ids or keys."); + } + if (handles.size() != keys.size()) { + return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch."); + } + + uint64_t start = clock_->NowMicros(); + + std::vector<std::string> values; + std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values); + + uint64_t end = clock_->NowMicros(); + + // Treat not found as ok, return other errors. + for (const Status& s : ss) { + if (!s.ok() && !s.IsNotFound()) { + return s; + } + } + + if (result != nullptr) { + // Report the actual opetation status in TraceExecutionResult + result->reset(new MultiValuesTraceExecutionResult( + std::move(ss), std::move(values), start, end, record.GetTraceType())); + } + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/trace_record_handler.h b/src/rocksdb/trace_replay/trace_record_handler.h new file mode 100644 index 000000000..88cf317dd --- /dev/null +++ b/src/rocksdb/trace_replay/trace_record_handler.h @@ -0,0 +1,46 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <memory> +#include <unordered_map> +#include <vector> + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/trace_record.h" + +namespace ROCKSDB_NAMESPACE { + +// Handler to execute TraceRecord. +class TraceExecutionHandler : public TraceRecord::Handler { + public: + TraceExecutionHandler(DB* db, + const std::vector<ColumnFamilyHandle*>& handles); + virtual ~TraceExecutionHandler() override; + + virtual Status Handle(const WriteQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) override; + virtual Status Handle(const GetQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) override; + virtual Status Handle(const IteratorSeekQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) override; + virtual Status Handle(const MultiGetQueryTraceRecord& record, + std::unique_ptr<TraceRecordResult>* result) override; + + private: + DB* db_; + std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_; + WriteOptions write_opts_; + ReadOptions read_opts_; + SystemClock* clock_; +}; + +// To do: Handler for trace_analyzer. + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/trace_record_result.cc b/src/rocksdb/trace_replay/trace_record_result.cc new file mode 100644 index 000000000..9c0cb43ad --- /dev/null +++ b/src/rocksdb/trace_replay/trace_record_result.cc @@ -0,0 +1,146 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/trace_record_result.h" + +namespace ROCKSDB_NAMESPACE { + +// TraceRecordResult +TraceRecordResult::TraceRecordResult(TraceType trace_type) + : trace_type_(trace_type) {} + +TraceType TraceRecordResult::GetTraceType() const { return trace_type_; } + +// TraceExecutionResult +TraceExecutionResult::TraceExecutionResult(uint64_t start_timestamp, + uint64_t end_timestamp, + TraceType trace_type) + : TraceRecordResult(trace_type), + ts_start_(start_timestamp), + ts_end_(end_timestamp) { + assert(ts_start_ <= ts_end_); +} + +uint64_t TraceExecutionResult::GetStartTimestamp() const { return ts_start_; } + +uint64_t TraceExecutionResult::GetEndTimestamp() const { return ts_end_; } + +// StatusOnlyTraceExecutionResult +StatusOnlyTraceExecutionResult::StatusOnlyTraceExecutionResult( + Status status, uint64_t start_timestamp, uint64_t end_timestamp, + TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + status_(std::move(status)) {} + +const Status& StatusOnlyTraceExecutionResult::GetStatus() const { + return status_; +} + +Status StatusOnlyTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// SingleValueTraceExecutionResult +SingleValueTraceExecutionResult::SingleValueTraceExecutionResult( + Status status, const std::string& value, uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + status_(std::move(status)), + value_(value) {} + +SingleValueTraceExecutionResult::SingleValueTraceExecutionResult( + Status status, std::string&& value, uint64_t start_timestamp, + uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + status_(std::move(status)), + value_(std::move(value)) {} + +SingleValueTraceExecutionResult::~SingleValueTraceExecutionResult() { + value_.clear(); +} + +const Status& SingleValueTraceExecutionResult::GetStatus() const { + return status_; +} + +const std::string& SingleValueTraceExecutionResult::GetValue() const { + return value_; +} + +Status SingleValueTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// MultiValuesTraceExecutionResult +MultiValuesTraceExecutionResult::MultiValuesTraceExecutionResult( + std::vector<Status> multi_status, std::vector<std::string> values, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + multi_status_(std::move(multi_status)), + values_(std::move(values)) {} + +MultiValuesTraceExecutionResult::~MultiValuesTraceExecutionResult() { + multi_status_.clear(); + values_.clear(); +} + +const std::vector<Status>& MultiValuesTraceExecutionResult::GetMultiStatus() + const { + return multi_status_; +} + +const std::vector<std::string>& MultiValuesTraceExecutionResult::GetValues() + const { + return values_; +} + +Status MultiValuesTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// IteratorTraceExecutionResult +IteratorTraceExecutionResult::IteratorTraceExecutionResult( + bool valid, Status status, PinnableSlice&& key, PinnableSlice&& value, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + valid_(valid), + status_(std::move(status)), + key_(std::move(key)), + value_(std::move(value)) {} + +IteratorTraceExecutionResult::IteratorTraceExecutionResult( + bool valid, Status status, const std::string& key, const std::string& value, + uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type) + : TraceExecutionResult(start_timestamp, end_timestamp, trace_type), + valid_(valid), + status_(std::move(status)) { + key_.PinSelf(key); + value_.PinSelf(value); +} + +IteratorTraceExecutionResult::~IteratorTraceExecutionResult() { + key_.clear(); + value_.clear(); +} + +bool IteratorTraceExecutionResult::GetValid() const { return valid_; } + +const Status& IteratorTraceExecutionResult::GetStatus() const { + return status_; +} + +Slice IteratorTraceExecutionResult::GetKey() const { return Slice(key_); } + +Slice IteratorTraceExecutionResult::GetValue() const { return Slice(value_); } + +Status IteratorTraceExecutionResult::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/trace_replay.cc b/src/rocksdb/trace_replay/trace_replay.cc new file mode 100644 index 000000000..37b95852b --- /dev/null +++ b/src/rocksdb/trace_replay/trace_replay.cc @@ -0,0 +1,622 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/trace_replay.h" + +#include <chrono> +#include <sstream> +#include <thread> + +#include "db/db_impl/db_impl.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/trace_reader_writer.h" +#include "rocksdb/write_batch.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +const std::string kTraceMagic = "feedcafedeadbeef"; + +namespace { +void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { + Slice buf(buffer); + GetFixed32(&buf, cf_id); + GetLengthPrefixedSlice(&buf, key); +} +} // namespace + +Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) { + if (v_string.find_first_of('.') == std::string::npos || + v_string.find_first_of('.') != v_string.find_last_of('.')) { + return Status::Corruption( + "Corrupted trace file. Incorrect version format."); + } + int tmp_num = 0; + for (int i = 0; i < static_cast<int>(v_string.size()); i++) { + if (v_string[i] == '.') { + continue; + } else if (isdigit(v_string[i])) { + tmp_num = tmp_num * 10 + (v_string[i] - '0'); + } else { + return Status::Corruption( + "Corrupted trace file. Incorrect version format"); + } + } + *v_num = tmp_num; + return Status::OK(); +} + +Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version, + int* db_version) { + std::vector<std::string> s_vec; + int begin = 0, end; + for (int i = 0; i < 3; i++) { + assert(header.payload.find("\t", begin) != std::string::npos); + end = static_cast<int>(header.payload.find("\t", begin)); + s_vec.push_back(header.payload.substr(begin, end - begin)); + begin = end + 1; + } + + std::string t_v_str, db_v_str; + assert(s_vec.size() == 3); + assert(s_vec[1].find("Trace Version: ") != std::string::npos); + t_v_str = s_vec[1].substr(15); + assert(s_vec[2].find("RocksDB Version: ") != std::string::npos); + db_v_str = s_vec[2].substr(17); + + Status s; + s = ParseVersionStr(t_v_str, trace_version); + if (s != Status::OK()) { + return s; + } + s = ParseVersionStr(db_v_str, db_version); + return s; +} + +void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) { + assert(encoded_trace); + PutFixed64(encoded_trace, trace.ts); + encoded_trace->push_back(trace.type); + PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size())); + encoded_trace->append(trace.payload); +} + +Status TracerHelper::DecodeTrace(const std::string& encoded_trace, + Trace* trace) { + assert(trace != nullptr); + Slice enc_slice = Slice(encoded_trace); + if (!GetFixed64(&enc_slice, &trace->ts)) { + return Status::Incomplete("Decode trace string failed"); + } + if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) { + return Status::Incomplete("Decode trace string failed"); + } + trace->type = static_cast<TraceType>(enc_slice[0]); + enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize); + trace->payload = enc_slice.ToString(); + return Status::OK(); +} + +Status TracerHelper::DecodeHeader(const std::string& encoded_trace, + Trace* header) { + Status s = TracerHelper::DecodeTrace(encoded_trace, header); + + if (header->type != kTraceBegin) { + return Status::Corruption("Corrupted trace file. Incorrect header."); + } + if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { + return Status::Corruption("Corrupted trace file. Incorrect magic."); + } + + return s; +} + +bool TracerHelper::SetPayloadMap(uint64_t& payload_map, + const TracePayloadType payload_type) { + uint64_t old_state = payload_map; + uint64_t tmp = 1; + payload_map |= (tmp << payload_type); + return old_state != payload_map; +} + +Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version, + std::unique_ptr<TraceRecord>* record) { + assert(trace != nullptr); + + if (record != nullptr) { + record->reset(nullptr); + } + + switch (trace->type) { + // Write + case kTraceWrite: { + PinnableSlice rep; + if (trace_file_version < 2) { + rep.PinSelf(trace->payload); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast<int64_t>(trace->payload_map); + Slice write_batch_data; + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast<uint32_t>(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kWriteBatchData: { + GetLengthPrefixedSlice(&buf, &write_batch_data); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + rep.PinSelf(write_batch_data); + } + + if (record != nullptr) { + record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts)); + } + + return Status::OK(); + } + // Get + case kTraceGet: { + uint32_t cf_id = 0; + Slice get_key; + + if (trace_file_version < 2) { + DecodeCFAndKey(trace->payload, &cf_id, &get_key); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast<int64_t>(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast<uint32_t>(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kGetCFID: { + GetFixed32(&buf, &cf_id); + break; + } + case TracePayloadType::kGetKey: { + GetLengthPrefixedSlice(&buf, &get_key); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + } + + if (record != nullptr) { + PinnableSlice ps; + ps.PinSelf(get_key); + record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts)); + } + + return Status::OK(); + } + // Iterator Seek and SeekForPrev + case kTraceIteratorSeek: + case kTraceIteratorSeekForPrev: { + uint32_t cf_id = 0; + Slice iter_key; + Slice lower_bound; + Slice upper_bound; + + if (trace_file_version < 2) { + DecodeCFAndKey(trace->payload, &cf_id, &iter_key); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast<int64_t>(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast<uint32_t>(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kIterCFID: { + GetFixed32(&buf, &cf_id); + break; + } + case TracePayloadType::kIterKey: { + GetLengthPrefixedSlice(&buf, &iter_key); + break; + } + case TracePayloadType::kIterLowerBound: { + GetLengthPrefixedSlice(&buf, &lower_bound); + break; + } + case TracePayloadType::kIterUpperBound: { + GetLengthPrefixedSlice(&buf, &upper_bound); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + } + + if (record != nullptr) { + PinnableSlice ps_key; + ps_key.PinSelf(iter_key); + PinnableSlice ps_lower; + ps_lower.PinSelf(lower_bound); + PinnableSlice ps_upper; + ps_upper.PinSelf(upper_bound); + record->reset(new IteratorSeekQueryTraceRecord( + static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), + cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper), + trace->ts)); + } + + return Status::OK(); + } + // MultiGet + case kTraceMultiGet: { + if (trace_file_version < 2) { + return Status::Corruption("MultiGet is not supported."); + } + + uint32_t multiget_size = 0; + std::vector<uint32_t> cf_ids; + std::vector<PinnableSlice> multiget_keys; + + Slice cfids_payload; + Slice keys_payload; + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast<int64_t>(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast<uint32_t>(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kMultiGetSize: { + GetFixed32(&buf, &multiget_size); + break; + } + case TracePayloadType::kMultiGetCFIDs: { + GetLengthPrefixedSlice(&buf, &cfids_payload); + break; + } + case TracePayloadType::kMultiGetKeys: { + GetLengthPrefixedSlice(&buf, &keys_payload); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + if (multiget_size == 0) { + return Status::InvalidArgument("Empty MultiGet cf_ids or keys."); + } + + // Decode the cfids_payload and keys_payload + cf_ids.reserve(multiget_size); + multiget_keys.reserve(multiget_size); + for (uint32_t i = 0; i < multiget_size; i++) { + uint32_t tmp_cfid; + Slice tmp_key; + GetFixed32(&cfids_payload, &tmp_cfid); + GetLengthPrefixedSlice(&keys_payload, &tmp_key); + cf_ids.push_back(tmp_cfid); + Slice s(tmp_key); + PinnableSlice ps; + ps.PinSelf(s); + multiget_keys.push_back(std::move(ps)); + } + + if (record != nullptr) { + record->reset(new MultiGetQueryTraceRecord( + std::move(cf_ids), std::move(multiget_keys), trace->ts)); + } + + return Status::OK(); + } + default: + return Status::NotSupported("Unsupported trace type."); + } +} + +Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer) + : clock_(clock), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)), + trace_request_count_(0) { + // TODO: What if this fails? + WriteHeader().PermitUncheckedError(); +} + +Tracer::~Tracer() { trace_writer_.reset(); } + +Status Tracer::Write(WriteBatch* write_batch) { + TraceType trace_type = kTraceWrite; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = trace_type; + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kWriteBatchData); + PutFixed64(&trace.payload, trace.payload_map); + PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data())); + return WriteTrace(trace); +} + +Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { + TraceType trace_type = kTraceGet; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = trace_type; + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID); + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey); + // Encode the Get struct members into payload. Make sure add them in order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, column_family->GetID()); + PutLengthPrefixedSlice(&trace.payload, key); + return WriteTrace(trace); +} + +Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, const Slice upper_bound) { + TraceType trace_type = kTraceIteratorSeek; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = trace_type; + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID); + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey); + if (lower_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterLowerBound); + } + if (upper_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterUpperBound); + } + // Encode the Iterator struct members into payload. Make sure add them in + // order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, cf_id); + PutLengthPrefixedSlice(&trace.payload, key); + if (lower_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, lower_bound); + } + if (upper_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, upper_bound); + } + return WriteTrace(trace); +} + +Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, + const Slice upper_bound) { + TraceType trace_type = kTraceIteratorSeekForPrev; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = trace_type; + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID); + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey); + if (lower_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterLowerBound); + } + if (upper_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterUpperBound); + } + // Encode the Iterator struct members into payload. Make sure add them in + // order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, cf_id); + PutLengthPrefixedSlice(&trace.payload, key); + if (lower_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, lower_bound); + } + if (upper_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, upper_bound); + } + return WriteTrace(trace); +} + +Status Tracer::MultiGet(const size_t num_keys, + ColumnFamilyHandle** column_families, + const Slice* keys) { + if (num_keys == 0) { + return Status::OK(); + } + std::vector<ColumnFamilyHandle*> v_column_families; + std::vector<Slice> v_keys; + v_column_families.resize(num_keys); + v_keys.resize(num_keys); + for (size_t i = 0; i < num_keys; i++) { + v_column_families[i] = column_families[i]; + v_keys[i] = keys[i]; + } + return MultiGet(v_column_families, v_keys); +} + +Status Tracer::MultiGet(const size_t num_keys, + ColumnFamilyHandle* column_family, const Slice* keys) { + if (num_keys == 0) { + return Status::OK(); + } + std::vector<ColumnFamilyHandle*> column_families; + std::vector<Slice> v_keys; + column_families.resize(num_keys); + v_keys.resize(num_keys); + for (size_t i = 0; i < num_keys; i++) { + column_families[i] = column_family; + v_keys[i] = keys[i]; + } + return MultiGet(column_families, v_keys); +} + +Status Tracer::MultiGet(const std::vector<ColumnFamilyHandle*>& column_families, + const std::vector<Slice>& keys) { + if (column_families.size() != keys.size()) { + return Status::Corruption("the CFs size and keys size does not match!"); + } + TraceType trace_type = kTraceMultiGet; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + uint32_t multiget_size = static_cast<uint32_t>(keys.size()); + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = trace_type; + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kMultiGetSize); + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kMultiGetCFIDs); + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kMultiGetKeys); + // Encode the CFIDs inorder + std::string cfids_payload; + std::string keys_payload; + for (uint32_t i = 0; i < multiget_size; i++) { + assert(i < column_families.size()); + assert(i < keys.size()); + PutFixed32(&cfids_payload, column_families[i]->GetID()); + PutLengthPrefixedSlice(&keys_payload, keys[i]); + } + // Encode the Get struct members into payload. Make sure add them in order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, multiget_size); + PutLengthPrefixedSlice(&trace.payload, cfids_payload); + PutLengthPrefixedSlice(&trace.payload, keys_payload); + return WriteTrace(trace); +} + +bool Tracer::ShouldSkipTrace(const TraceType& trace_type) { + if (IsTraceFileOverMax()) { + return true; + } + + TraceFilterType filter_mask = kTraceFilterNone; + switch (trace_type) { + case kTraceNone: + case kTraceBegin: + case kTraceEnd: + filter_mask = kTraceFilterNone; + break; + case kTraceWrite: + filter_mask = kTraceFilterWrite; + break; + case kTraceGet: + filter_mask = kTraceFilterGet; + break; + case kTraceIteratorSeek: + filter_mask = kTraceFilterIteratorSeek; + break; + case kTraceIteratorSeekForPrev: + filter_mask = kTraceFilterIteratorSeekForPrev; + break; + case kBlockTraceIndexBlock: + case kBlockTraceFilterBlock: + case kBlockTraceDataBlock: + case kBlockTraceUncompressionDictBlock: + case kBlockTraceRangeDeletionBlock: + case kIOTracer: + filter_mask = kTraceFilterNone; + break; + case kTraceMultiGet: + filter_mask = kTraceFilterMultiGet; + break; + case kTraceMax: + assert(false); + filter_mask = kTraceFilterNone; + break; + } + if (filter_mask != kTraceFilterNone && trace_options_.filter & filter_mask) { + return true; + } + + ++trace_request_count_; + if (trace_request_count_ < trace_options_.sampling_frequency) { + return true; + } + trace_request_count_ = 0; + return false; +} + +bool Tracer::IsTraceFileOverMax() { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + return (trace_file_size > trace_options_.max_trace_file_size); +} + +Status Tracer::WriteHeader() { + std::ostringstream s; + s << kTraceMagic << "\t" + << "Trace Version: " << kTraceFileMajorVersion << "." + << kTraceFileMinorVersion << "\t" + << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" + << "Format: Timestamp OpType Payload\n"; + std::string header(s.str()); + + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = kTraceBegin; + trace.payload = header; + return WriteTrace(trace); +} + +Status Tracer::WriteFooter() { + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = kTraceEnd; + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kEmptyPayload); + trace.payload = ""; + return WriteTrace(trace); +} + +Status Tracer::WriteTrace(const Trace& trace) { + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(Slice(encoded_trace)); +} + +Status Tracer::Close() { return WriteFooter(); } + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/trace_replay/trace_replay.h b/src/rocksdb/trace_replay/trace_replay.h new file mode 100644 index 000000000..9aba5ceb7 --- /dev/null +++ b/src/rocksdb/trace_replay/trace_replay.h @@ -0,0 +1,183 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <memory> +#include <mutex> +#include <unordered_map> +#include <utility> + +#include "rocksdb/options.h" +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_record.h" +#include "rocksdb/utilities/replayer.h" + +namespace ROCKSDB_NAMESPACE { + +// This file contains Tracer and Replayer classes that enable capturing and +// replaying RocksDB traces. + +class ColumnFamilyHandle; +class ColumnFamilyData; +class DB; +class DBImpl; +class Env; +class Slice; +class SystemClock; +class TraceReader; +class TraceWriter; +class WriteBatch; + +struct ReadOptions; +struct TraceOptions; +struct WriteOptions; + +extern const std::string kTraceMagic; +const unsigned int kTraceTimestampSize = 8; +const unsigned int kTraceTypeSize = 1; +const unsigned int kTracePayloadLengthSize = 4; +const unsigned int kTraceMetadataSize = + kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize; + +static const int kTraceFileMajorVersion = 0; +static const int kTraceFileMinorVersion = 2; + +// The data structure that defines a single trace. +struct Trace { + uint64_t ts; // timestamp + TraceType type; + // Each bit in payload_map stores which corresponding struct member added in + // the payload. Each TraceType has its corresponding payload struct. For + // example, if bit at position 0 is set in write payload, then the write batch + // will be addedd. + uint64_t payload_map = 0; + // Each trace type has its own payload_struct, which will be serilized in the + // payload. + std::string payload; + + void reset() { + ts = 0; + type = kTraceMax; + payload_map = 0; + payload.clear(); + } +}; + +enum TracePayloadType : char { + // Each member of all query payload structs should have a corresponding flag + // here. Make sure to add them sequentially in the order of it is added. + kEmptyPayload = 0, + kWriteBatchData = 1, + kGetCFID = 2, + kGetKey = 3, + kIterCFID = 4, + kIterKey = 5, + kIterLowerBound = 6, + kIterUpperBound = 7, + kMultiGetSize = 8, + kMultiGetCFIDs = 9, + kMultiGetKeys = 10, +}; + +class TracerHelper { + public: + // Parse the string with major and minor version only + static Status ParseVersionStr(std::string& v_string, int* v_num); + + // Parse the trace file version and db version in trace header + static Status ParseTraceHeader(const Trace& header, int* trace_version, + int* db_version); + + // Encode a version 0.1 trace object into the given string. + static void EncodeTrace(const Trace& trace, std::string* encoded_trace); + + // Decode a string into the given trace object. + static Status DecodeTrace(const std::string& encoded_trace, Trace* trace); + + // Decode a string into the given trace header. + static Status DecodeHeader(const std::string& encoded_trace, Trace* header); + + // Set the payload map based on the payload type + static bool SetPayloadMap(uint64_t& payload_map, + const TracePayloadType payload_type); + + // Decode a Trace object into the corresponding TraceRecord. + // Return Status::OK() if nothing is wrong, record will be set accordingly. + // Return Status::NotSupported() if the trace type is not support, or the + // corresponding error status, record will be set to nullptr. + static Status DecodeTraceRecord(Trace* trace, int trace_file_version, + std::unique_ptr<TraceRecord>* record); +}; + +// Tracer captures all RocksDB operations using a user-provided TraceWriter. +// Every RocksDB operation is written as a single trace. Each trace will have a +// timestamp and type, followed by the trace payload. +class Tracer { + public: + Tracer(SystemClock* clock, const TraceOptions& trace_options, + std::unique_ptr<TraceWriter>&& trace_writer); + ~Tracer(); + + // Trace all write operations -- Put, Merge, Delete, SingleDelete, Write + Status Write(WriteBatch* write_batch); + + // Trace Get operations. + Status Get(ColumnFamilyHandle* cfname, const Slice& key); + + // Trace Iterators. + Status IteratorSeek(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, const Slice upper_bound); + Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, const Slice upper_bound); + + // Trace MultiGet + + Status MultiGet(const size_t num_keys, ColumnFamilyHandle** column_families, + const Slice* keys); + + Status MultiGet(const size_t num_keys, ColumnFamilyHandle* column_family, + const Slice* keys); + + Status MultiGet(const std::vector<ColumnFamilyHandle*>& column_family, + const std::vector<Slice>& keys); + + // Returns true if the trace is over the configured max trace file limit. + // False otherwise. + bool IsTraceFileOverMax(); + + // Returns true if the order of write trace records must match the order of + // the corresponding records logged to WAL and applied to the DB. + bool IsWriteOrderPreserved() { return trace_options_.preserve_write_order; } + + // Writes a trace footer at the end of the tracing + Status Close(); + + private: + // Write a trace header at the beginning, typically on initiating a trace, + // with some metadata like a magic number, trace version, RocksDB version, and + // trace format. + Status WriteHeader(); + + // Write a trace footer, typically on ending a trace, with some metadata. + Status WriteFooter(); + + // Write a single trace using the provided TraceWriter to the underlying + // system, say, a filesystem or a streaming service. + Status WriteTrace(const Trace& trace); + + // Helps in filtering and sampling of traces. + // Returns true if a trace should be skipped, false otherwise. + bool ShouldSkipTrace(const TraceType& type); + + SystemClock* clock_; + TraceOptions trace_options_; + std::unique_ptr<TraceWriter> trace_writer_; + uint64_t trace_request_count_; +}; + +} // namespace ROCKSDB_NAMESPACE |