// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "trace_replay/block_cache_tracer.h" #include "rocksdb/env.h" #include "rocksdb/status.h" #include "rocksdb/trace_reader_writer.h" #include "test_util/testharness.h" #include "test_util/testutil.h" namespace ROCKSDB_NAMESPACE { namespace { const uint64_t kBlockSize = 1024; const std::string kBlockKeyPrefix = "test-block-"; const uint32_t kCFId = 0; const uint32_t kLevel = 1; const uint64_t kSSTFDNumber = 100; const std::string kRefKeyPrefix = "test-get-"; const uint64_t kNumKeysInBlock = 1024; const uint64_t kReferencedDataSize = 10; } // namespace class BlockCacheTracerTest : public testing::Test { public: BlockCacheTracerTest() { test_path_ = test::PerThreadDBPath("block_cache_tracer_test"); env_ = ROCKSDB_NAMESPACE::Env::Default(); EXPECT_OK(env_->CreateDir(test_path_)); trace_file_path_ = test_path_ + "/block_cache_trace"; } ~BlockCacheTracerTest() override { EXPECT_OK(env_->DeleteFile(trace_file_path_)); EXPECT_OK(env_->DeleteDir(test_path_)); } TableReaderCaller GetCaller(uint32_t key_id) { uint32_t n = key_id % 5; switch (n) { case 0: return TableReaderCaller::kPrefetch; case 1: return TableReaderCaller::kCompaction; case 2: return TableReaderCaller::kUserGet; case 3: return TableReaderCaller::kUserMultiGet; case 4: return TableReaderCaller::kUserIterator; } assert(false); } void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id, TraceType block_type, uint32_t nblocks) { assert(writer); for (uint32_t i = 0; i < nblocks; i++) { uint32_t key_id = from_key_id + i; BlockCacheTraceRecord record; record.block_type = block_type; record.block_size = kBlockSize + key_id; record.block_key = (kBlockKeyPrefix + std::to_string(key_id)); record.access_timestamp = env_->NowMicros(); record.cf_id = kCFId; record.cf_name = kDefaultColumnFamilyName; record.caller = GetCaller(key_id); record.level = kLevel; record.sst_fd_number = kSSTFDNumber + key_id; record.is_cache_hit = Boolean::kFalse; record.no_insert = Boolean::kFalse; // Provide get_id for all callers. The writer should only write get_id // when the caller is either GET or MGET. record.get_id = key_id + 1; record.get_from_user_specified_snapshot = Boolean::kTrue; // Provide these fields for all block types. // The writer should only write these fields for data blocks and the // caller is either GET or MGET. record.referenced_key = (kRefKeyPrefix + std::to_string(key_id)); record.referenced_key_exist_in_block = Boolean::kTrue; record.num_keys_in_block = kNumKeysInBlock; record.referenced_data_size = kReferencedDataSize + key_id; ASSERT_OK(writer->WriteBlockAccess( record, record.block_key, record.cf_name, record.referenced_key)); } } BlockCacheTraceRecord GenerateAccessRecord() { uint32_t key_id = 0; BlockCacheTraceRecord record; record.block_type = TraceType::kBlockTraceDataBlock; record.block_size = kBlockSize; record.block_key = kBlockKeyPrefix + std::to_string(key_id); record.access_timestamp = env_->NowMicros(); record.cf_id = kCFId; record.cf_name = kDefaultColumnFamilyName; record.caller = GetCaller(key_id); record.level = kLevel; record.sst_fd_number = kSSTFDNumber + key_id; record.is_cache_hit = Boolean::kFalse; record.no_insert = Boolean::kFalse; record.referenced_key = kRefKeyPrefix + std::to_string(key_id); record.referenced_key_exist_in_block = Boolean::kTrue; record.num_keys_in_block = kNumKeysInBlock; return record; } void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id, TraceType block_type, uint32_t nblocks) { assert(reader); for (uint32_t i = 0; i < nblocks; i++) { uint32_t key_id = from_key_id + i; BlockCacheTraceRecord record; ASSERT_OK(reader->ReadAccess(&record)); ASSERT_EQ(block_type, record.block_type); ASSERT_EQ(kBlockSize + key_id, record.block_size); ASSERT_EQ(kBlockKeyPrefix + std::to_string(key_id), record.block_key); ASSERT_EQ(kCFId, record.cf_id); ASSERT_EQ(kDefaultColumnFamilyName, record.cf_name); ASSERT_EQ(GetCaller(key_id), record.caller); ASSERT_EQ(kLevel, record.level); ASSERT_EQ(kSSTFDNumber + key_id, record.sst_fd_number); ASSERT_EQ(Boolean::kFalse, record.is_cache_hit); ASSERT_EQ(Boolean::kFalse, record.no_insert); if (record.caller == TableReaderCaller::kUserGet || record.caller == TableReaderCaller::kUserMultiGet) { ASSERT_EQ(key_id + 1, record.get_id); ASSERT_EQ(Boolean::kTrue, record.get_from_user_specified_snapshot); ASSERT_EQ(kRefKeyPrefix + std::to_string(key_id), record.referenced_key); } else { ASSERT_EQ(BlockCacheTraceHelper::kReservedGetId, record.get_id); ASSERT_EQ(Boolean::kFalse, record.get_from_user_specified_snapshot); ASSERT_EQ("", record.referenced_key); } if (block_type == TraceType::kBlockTraceDataBlock && (record.caller == TableReaderCaller::kUserGet || record.caller == TableReaderCaller::kUserMultiGet)) { ASSERT_EQ(Boolean::kTrue, record.referenced_key_exist_in_block); ASSERT_EQ(kNumKeysInBlock, record.num_keys_in_block); ASSERT_EQ(kReferencedDataSize + key_id, record.referenced_data_size); continue; } ASSERT_EQ(Boolean::kFalse, record.referenced_key_exist_in_block); ASSERT_EQ(0, record.num_keys_in_block); ASSERT_EQ(0, record.referenced_data_size); } } Env* env_; EnvOptions env_options_; std::string trace_file_path_; std::string test_path_; }; TEST_F(BlockCacheTracerTest, AtomicWriteBeforeStartTrace) { BlockCacheTraceRecord record = GenerateAccessRecord(); { std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); BlockCacheTracer writer; // The record should be written to the trace_file since StartTrace is not // called. ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, record.referenced_key)); ASSERT_OK(env_->FileExists(trace_file_path_)); } { // Verify trace file contains nothing. std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, &trace_reader)); BlockCacheTraceReader reader(std::move(trace_reader)); BlockCacheTraceHeader header; ASSERT_NOK(reader.ReadHeader(&header)); } } TEST_F(BlockCacheTracerTest, AtomicWrite) { BlockCacheTraceRecord record = GenerateAccessRecord(); { TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); BlockCacheTracer writer; ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, record.referenced_key)); ASSERT_OK(env_->FileExists(trace_file_path_)); } { // Verify trace file contains one record. std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, &trace_reader)); BlockCacheTraceReader reader(std::move(trace_reader)); BlockCacheTraceHeader header; ASSERT_OK(reader.ReadHeader(&header)); ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); ASSERT_NOK(reader.ReadAccess(&record)); } } TEST_F(BlockCacheTracerTest, ConsecutiveStartTrace) { TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK( NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); BlockCacheTracer writer; ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); ASSERT_NOK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); ASSERT_OK(env_->FileExists(trace_file_path_)); } TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) { BlockCacheTraceRecord record = GenerateAccessRecord(); { TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); BlockCacheTracer writer; ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, record.referenced_key)); writer.EndTrace(); // Write the record again. This time the record should not be written since // EndTrace is called. ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name, record.referenced_key)); ASSERT_OK(env_->FileExists(trace_file_path_)); } { // Verify trace file contains one record. std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, &trace_reader)); BlockCacheTraceReader reader(std::move(trace_reader)); BlockCacheTraceHeader header; ASSERT_OK(reader.ReadHeader(&header)); ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1); ASSERT_NOK(reader.ReadAccess(&record)); } } TEST_F(BlockCacheTracerTest, NextGetId) { BlockCacheTracer writer; { TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); // next get id should always return 0 before we call StartTrace. ASSERT_EQ(0, writer.NextGetId()); ASSERT_EQ(0, writer.NextGetId()); ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); ASSERT_EQ(1, writer.NextGetId()); ASSERT_EQ(2, writer.NextGetId()); writer.EndTrace(); // next get id should return 0. ASSERT_EQ(0, writer.NextGetId()); } // Start trace again and next get id should return 1. { TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer))); ASSERT_EQ(1, writer.NextGetId()); } } TEST_F(BlockCacheTracerTest, MixedBlocks) { { // Generate a trace file containing a mix of blocks. TraceOptions trace_opt; std::unique_ptr trace_writer; ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer)); BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer)); ASSERT_OK(writer.WriteHeader()); // Write blocks of different types. WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock, 10); WriteBlockAccess(&writer, 10, TraceType::kBlockTraceDataBlock, 10); WriteBlockAccess(&writer, 20, TraceType::kBlockTraceFilterBlock, 10); WriteBlockAccess(&writer, 30, TraceType::kBlockTraceIndexBlock, 10); WriteBlockAccess(&writer, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); ASSERT_OK(env_->FileExists(trace_file_path_)); } { // Verify trace file is generated correctly. std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, &trace_reader)); BlockCacheTraceReader reader(std::move(trace_reader)); BlockCacheTraceHeader header; ASSERT_OK(reader.ReadHeader(&header)); ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); // Read blocks. VerifyAccess(&reader, 0, TraceType::kBlockTraceUncompressionDictBlock, 10); VerifyAccess(&reader, 10, TraceType::kBlockTraceDataBlock, 10); VerifyAccess(&reader, 20, TraceType::kBlockTraceFilterBlock, 10); VerifyAccess(&reader, 30, TraceType::kBlockTraceIndexBlock, 10); VerifyAccess(&reader, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); // Read one more record should report an error. BlockCacheTraceRecord record; ASSERT_NOK(reader.ReadAccess(&record)); } } TEST_F(BlockCacheTracerTest, HumanReadableTrace) { BlockCacheTraceRecord record = GenerateAccessRecord(); record.get_id = 1; record.referenced_key = ""; record.caller = TableReaderCaller::kUserGet; record.get_from_user_specified_snapshot = Boolean::kTrue; record.referenced_data_size = kReferencedDataSize; PutFixed32(&record.referenced_key, 111); PutLengthPrefixedSlice(&record.referenced_key, "get_key"); PutFixed64(&record.referenced_key, 2 << 8); PutLengthPrefixedSlice(&record.block_key, "block_key"); PutVarint64(&record.block_key, 333); { // Generate a human readable trace file. BlockCacheHumanReadableTraceWriter writer; ASSERT_OK(writer.NewWritableFile(trace_file_path_, env_)); ASSERT_OK(writer.WriteHumanReadableTraceRecord(record, 1, 1)); ASSERT_OK(env_->FileExists(trace_file_path_)); } { BlockCacheHumanReadableTraceReader reader(trace_file_path_); BlockCacheTraceHeader header; BlockCacheTraceRecord read_record; ASSERT_OK(reader.ReadHeader(&header)); ASSERT_OK(reader.ReadAccess(&read_record)); ASSERT_EQ(TraceType::kBlockTraceDataBlock, read_record.block_type); ASSERT_EQ(kBlockSize, read_record.block_size); ASSERT_EQ(kCFId, read_record.cf_id); ASSERT_EQ(kDefaultColumnFamilyName, read_record.cf_name); ASSERT_EQ(TableReaderCaller::kUserGet, read_record.caller); ASSERT_EQ(kLevel, read_record.level); ASSERT_EQ(kSSTFDNumber, read_record.sst_fd_number); ASSERT_EQ(Boolean::kFalse, read_record.is_cache_hit); ASSERT_EQ(Boolean::kFalse, read_record.no_insert); ASSERT_EQ(1, read_record.get_id); ASSERT_EQ(Boolean::kTrue, read_record.get_from_user_specified_snapshot); ASSERT_EQ(Boolean::kTrue, read_record.referenced_key_exist_in_block); ASSERT_EQ(kNumKeysInBlock, read_record.num_keys_in_block); ASSERT_EQ(kReferencedDataSize, read_record.referenced_data_size); ASSERT_EQ(record.block_key.size(), read_record.block_key.size()); ASSERT_EQ(record.referenced_key.size(), record.referenced_key.size()); ASSERT_EQ(112, BlockCacheTraceHelper::GetTableId(read_record)); ASSERT_EQ(3, BlockCacheTraceHelper::GetSequenceNumber(read_record)); ASSERT_EQ(333, BlockCacheTraceHelper::GetBlockOffsetInFile(read_record)); // Read again should fail. ASSERT_NOK(reader.ReadAccess(&read_record)); } } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }