// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #ifndef ROCKSDB_LITE #include #include #include #include #include #include #include "rocksdb/env.h" #include "rocksdb/trace_reader_writer.h" #include "rocksdb/write_batch.h" #include "trace_replay/trace_replay.h" namespace ROCKSDB_NAMESPACE { class DBImpl; class WriteBatch; enum TraceOperationType : int { kGet = 0, kPut = 1, kDelete = 2, kSingleDelete = 3, kRangeDelete = 4, kMerge = 5, kIteratorSeek = 6, kIteratorSeekForPrev = 7, kTaTypeNum = 8 }; struct TraceUnit { uint64_t ts; uint32_t type; uint32_t cf_id; size_t value_size; std::string key; }; struct TypeCorrelation { uint64_t count; uint64_t total_ts; }; struct StatsUnit { uint64_t key_id; uint64_t access_count; uint64_t latest_ts; uint64_t succ_count; // current only used to count Get if key found uint32_t cf_id; size_t value_size; std::vector v_correlation; }; class AnalyzerOptions { public: std::vector> correlation_map; std::vector> correlation_list; AnalyzerOptions(); ~AnalyzerOptions(); void SparseCorrelationInput(const std::string& in_str); }; // Note that, for the variable names in the trace_analyzer, // Starting with 'a_' means the variable is used for 'accessed_keys'. // Starting with 'w_' means it is used for 'the whole key space'. // Ending with '_f' means a file write or reader pointer. // For example, 'a_count' means 'accessed_keys_count', // 'w_key_f' means 'whole_key_space_file'. struct TraceStats { uint32_t cf_id; std::string cf_name; uint64_t a_count; uint64_t a_succ_count; uint64_t a_key_id; uint64_t a_key_size_sqsum; uint64_t a_key_size_sum; uint64_t a_key_mid; uint64_t a_value_size_sqsum; uint64_t a_value_size_sum; uint64_t a_value_mid; uint32_t a_peak_qps; double a_ave_qps; std::map a_key_stats; std::map a_count_stats; std::map a_key_size_stats; std::map a_value_size_stats; std::map a_qps_stats; std::map> a_qps_prefix_stats; std::priority_queue, std::vector>, std::greater>> top_k_queue; std::priority_queue, std::vector>, std::greater>> top_k_prefix_access; std::priority_queue, std::vector>, std::greater>> top_k_prefix_ave; std::priority_queue, std::vector>, std::greater>> top_k_qps_sec; std::list time_series; std::vector> correlation_output; std::map uni_key_num; std::unique_ptr time_series_f; std::unique_ptr a_key_f; std::unique_ptr a_count_dist_f; std::unique_ptr a_prefix_cut_f; std::unique_ptr a_value_size_f; std::unique_ptr a_key_size_f; std::unique_ptr a_key_num_f; std::unique_ptr a_qps_f; std::unique_ptr a_top_qps_prefix_f; std::unique_ptr w_key_f; std::unique_ptr w_prefix_cut_f; TraceStats(); ~TraceStats(); TraceStats(const TraceStats&) = delete; TraceStats& operator=(const TraceStats&) = delete; TraceStats(TraceStats&&) = default; TraceStats& operator=(TraceStats&&) = default; }; struct TypeUnit { std::string type_name; bool enabled; uint64_t total_keys; uint64_t total_access; uint64_t total_succ_access; uint32_t sample_count; std::map stats; TypeUnit() = default; ~TypeUnit() = default; TypeUnit(const TypeUnit&) = delete; TypeUnit& operator=(const TypeUnit&) = delete; TypeUnit(TypeUnit&&) = default; TypeUnit& operator=(TypeUnit&&) = default; }; struct CfUnit { uint32_t cf_id; uint64_t w_count; // total keys in this cf if we use the whole key space uint64_t a_count; // the total keys in this cf that are accessed std::map w_key_size_stats; // whole key space key size // statistic this cf std::map cf_qps; }; class TraceAnalyzer { public: TraceAnalyzer(std::string& trace_path, std::string& output_path, AnalyzerOptions _analyzer_opts); ~TraceAnalyzer(); Status PrepareProcessing(); Status StartProcessing(); Status MakeStatistics(); Status ReProcessing(); Status EndProcessing(); Status WriteTraceUnit(TraceUnit& unit); // The trace processing functions for different type Status HandleGet(uint32_t column_family_id, const std::string& key, const uint64_t& ts, const uint32_t& get_ret); Status HandlePut(uint32_t column_family_id, const Slice& key, const Slice& value); Status HandleDelete(uint32_t column_family_id, const Slice& key); Status HandleSingleDelete(uint32_t column_family_id, const Slice& key); Status HandleDeleteRange(uint32_t column_family_id, const Slice& begin_key, const Slice& end_key); Status HandleMerge(uint32_t column_family_id, const Slice& key, const Slice& value); Status HandleIter(uint32_t column_family_id, const std::string& key, const uint64_t& ts, TraceType& trace_type); std::vector& GetTaVector() { return ta_; } private: ROCKSDB_NAMESPACE::Env* env_; EnvOptions env_options_; std::unique_ptr trace_reader_; size_t offset_; char buffer_[1024]; uint64_t c_time_; std::string trace_name_; std::string output_path_; AnalyzerOptions analyzer_opts_; uint64_t total_requests_; uint64_t total_access_keys_; uint64_t total_gets_; uint64_t total_writes_; uint64_t trace_create_time_; uint64_t begin_time_; uint64_t end_time_; uint64_t time_series_start_; uint32_t sample_max_; uint32_t cur_time_sec_; std::unique_ptr trace_sequence_f_; // readable trace std::unique_ptr qps_f_; // overall qps std::unique_ptr cf_qps_f_; // The qps of each CF> std::vector ta_; // The main statistic collecting data structure std::map cfs_; // All the cf_id appears in this trace; std::vector qps_peak_; std::vector qps_ave_; Status ReadTraceHeader(Trace* header); Status ReadTraceFooter(Trace* footer); Status ReadTraceRecord(Trace* trace); Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id, const std::string& key, const size_t value_size, const uint64_t ts); Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type, const uint64_t& ts, const std::string& key); Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats); Status CreateOutputFile( const std::string& type, const std::string& cf_name, const std::string& ending, std::unique_ptr* f_ptr); void CloseOutputFiles(); void PrintStatistics(); Status TraceUnitWriter( std::unique_ptr& f_ptr, TraceUnit& unit); Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id, const std::string& key, const size_t value_size, const uint64_t ts); Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats); Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit); Status MakeStatisticQPS(); }; // write bach handler to be used for WriteBache iterator // when processing the write trace class TraceWriteHandler : public WriteBatch::Handler { public: TraceWriteHandler() { ta_ptr = nullptr; } explicit TraceWriteHandler(TraceAnalyzer* _ta_ptr) { ta_ptr = _ta_ptr; } ~TraceWriteHandler() {} virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { return ta_ptr->HandlePut(column_family_id, key, value); } virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { return ta_ptr->HandleDelete(column_family_id, key); } virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { return ta_ptr->HandleSingleDelete(column_family_id, key); } virtual Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, const Slice& end_key) override { return ta_ptr->HandleDeleteRange(column_family_id, begin_key, end_key); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { return ta_ptr->HandleMerge(column_family_id, key, value); } private: TraceAnalyzer* ta_ptr; }; int trace_analyzer_tool(int argc, char** argv); } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE