summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/tools/trace_analyzer_tool.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/tools/trace_analyzer_tool.h')
-rw-r--r--src/rocksdb/tools/trace_analyzer_tool.h326
1 files changed, 326 insertions, 0 deletions
diff --git a/src/rocksdb/tools/trace_analyzer_tool.h b/src/rocksdb/tools/trace_analyzer_tool.h
new file mode 100644
index 000000000..4b885b18c
--- /dev/null
+++ b/src/rocksdb/tools/trace_analyzer_tool.h
@@ -0,0 +1,326 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <list>
+#include <map>
+#include <queue>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "rocksdb/env.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/write_batch.h"
+#include "trace_replay/trace_replay.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Value sizes may be used as denominators. Replacing 0 value sizes with this
+// positive integer avoids division error.
+extern const size_t kShadowValueSize /* = 10*/;
+
+enum TraceOperationType : int {
+ kGet = 0,
+ kPut = 1,
+ kDelete = 2,
+ kSingleDelete = 3,
+ kRangeDelete = 4,
+ kMerge = 5,
+ kIteratorSeek = 6,
+ kIteratorSeekForPrev = 7,
+ kMultiGet = 8,
+ kTaTypeNum = 9
+};
+
+struct TraceUnit {
+ uint64_t ts;
+ uint32_t type;
+ uint32_t cf_id;
+ size_t value_size;
+ std::string key;
+};
+
+struct TypeCorrelation {
+ uint64_t count;
+ uint64_t total_ts;
+};
+
+struct StatsUnit {
+ uint64_t key_id;
+ uint64_t access_count;
+ uint64_t latest_ts;
+ uint64_t succ_count; // current only used to count Get if key found
+ uint32_t cf_id;
+ size_t value_size;
+ std::vector<TypeCorrelation> v_correlation;
+};
+
+class AnalyzerOptions {
+ public:
+ std::vector<std::vector<int>> correlation_map;
+ std::vector<std::pair<int, int>> correlation_list;
+
+ AnalyzerOptions();
+
+ ~AnalyzerOptions();
+
+ void SparseCorrelationInput(const std::string& in_str);
+};
+
+// Note that, for the variable names in the trace_analyzer,
+// Starting with 'a_' means the variable is used for 'accessed_keys'.
+// Starting with 'w_' means it is used for 'the whole key space'.
+// Ending with '_f' means a file write or reader pointer.
+// For example, 'a_count' means 'accessed_keys_count',
+// 'w_key_f' means 'whole_key_space_file'.
+
+struct TraceStats {
+ uint32_t cf_id;
+ std::string cf_name;
+ uint64_t a_count;
+ uint64_t a_succ_count;
+ uint64_t a_key_id;
+ uint64_t a_key_size_sqsum;
+ uint64_t a_key_size_sum;
+ uint64_t a_key_mid;
+ uint64_t a_value_size_sqsum;
+ uint64_t a_value_size_sum;
+ uint64_t a_value_mid;
+ uint32_t a_peak_qps;
+ double a_ave_qps;
+ std::map<std::string, StatsUnit> a_key_stats;
+ std::map<uint64_t, uint64_t> a_count_stats;
+ std::map<uint64_t, uint64_t> a_key_size_stats;
+ std::map<uint64_t, uint64_t> a_value_size_stats;
+ std::map<uint32_t, uint32_t> a_qps_stats;
+ std::map<uint32_t, std::map<std::string, uint32_t>> a_qps_prefix_stats;
+ std::priority_queue<std::pair<uint64_t, std::string>,
+ std::vector<std::pair<uint64_t, std::string>>,
+ std::greater<std::pair<uint64_t, std::string>>>
+ top_k_queue;
+ std::priority_queue<std::pair<uint64_t, std::string>,
+ std::vector<std::pair<uint64_t, std::string>>,
+ std::greater<std::pair<uint64_t, std::string>>>
+ top_k_prefix_access;
+ std::priority_queue<std::pair<double, std::string>,
+ std::vector<std::pair<double, std::string>>,
+ std::greater<std::pair<double, std::string>>>
+ top_k_prefix_ave;
+ std::priority_queue<std::pair<uint32_t, uint32_t>,
+ std::vector<std::pair<uint32_t, uint32_t>>,
+ std::greater<std::pair<uint32_t, uint32_t>>>
+ top_k_qps_sec;
+ std::list<TraceUnit> time_series;
+ std::vector<std::pair<uint64_t, uint64_t>> correlation_output;
+ std::map<uint32_t, uint64_t> uni_key_num;
+
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> time_series_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_count_dist_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_prefix_cut_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_value_size_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_size_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_num_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_qps_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_top_qps_prefix_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_key_f;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_prefix_cut_f;
+
+ TraceStats();
+ ~TraceStats();
+ TraceStats(const TraceStats&) = delete;
+ TraceStats& operator=(const TraceStats&) = delete;
+ TraceStats(TraceStats&&) = default;
+ TraceStats& operator=(TraceStats&&) = default;
+};
+
+struct TypeUnit {
+ std::string type_name;
+ bool enabled;
+ uint64_t total_keys;
+ uint64_t total_access;
+ uint64_t total_succ_access;
+ uint32_t sample_count;
+ std::map<uint32_t, TraceStats> stats;
+ TypeUnit() = default;
+ ~TypeUnit() = default;
+ TypeUnit(const TypeUnit&) = delete;
+ TypeUnit& operator=(const TypeUnit&) = delete;
+ TypeUnit(TypeUnit&&) = default;
+ TypeUnit& operator=(TypeUnit&&) = default;
+};
+
+struct CfUnit {
+ uint32_t cf_id;
+ uint64_t w_count; // total keys in this cf if we use the whole key space
+ uint64_t a_count; // the total keys in this cf that are accessed
+ std::map<uint64_t, uint64_t> w_key_size_stats; // whole key space key size
+ // statistic this cf
+ std::map<uint32_t, uint32_t> cf_qps;
+};
+
+class TraceAnalyzer : private TraceRecord::Handler,
+ private WriteBatch::Handler {
+ public:
+ TraceAnalyzer(std::string& trace_path, std::string& output_path,
+ AnalyzerOptions _analyzer_opts);
+ ~TraceAnalyzer();
+
+ Status PrepareProcessing();
+
+ Status StartProcessing();
+
+ Status MakeStatistics();
+
+ Status ReProcessing();
+
+ Status EndProcessing();
+
+ Status WriteTraceUnit(TraceUnit& unit);
+
+ std::vector<TypeUnit>& GetTaVector() { return ta_; }
+
+ private:
+ using TraceRecord::Handler::Handle;
+ Status Handle(const WriteQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ Status Handle(const GetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ Status Handle(const IteratorSeekQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+ Status Handle(const MultiGetQueryTraceRecord& record,
+ std::unique_ptr<TraceRecordResult>* result) override;
+
+ using WriteBatch::Handler::PutCF;
+ Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override;
+
+ using WriteBatch::Handler::DeleteCF;
+ Status DeleteCF(uint32_t column_family_id, const Slice& key) override;
+
+ using WriteBatch::Handler::SingleDeleteCF;
+ Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override;
+
+ using WriteBatch::Handler::DeleteRangeCF;
+ Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
+ const Slice& end_key) override;
+
+ using WriteBatch::Handler::MergeCF;
+ Status MergeCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override;
+
+ // The following hanlders are not implemented, return Status::OK() to avoid
+ // the running time assertion and other irrelevant falures.
+ using WriteBatch::Handler::PutBlobIndexCF;
+ Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::OK();
+ }
+
+ // The default implementation of LogData does nothing.
+ using WriteBatch::Handler::LogData;
+ void LogData(const Slice& /*blob*/) override {}
+
+ using WriteBatch::Handler::MarkBeginPrepare;
+ Status MarkBeginPrepare(bool = false) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkEndPrepare;
+ Status MarkEndPrepare(const Slice& /*xid*/) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkNoop;
+ Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkRollback;
+ Status MarkRollback(const Slice& /*xid*/) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkCommit;
+ Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); }
+
+ using WriteBatch::Handler::MarkCommitWithTimestamp;
+ Status MarkCommitWithTimestamp(const Slice& /*xid*/,
+ const Slice& /*commit_ts*/) override {
+ return Status::OK();
+ }
+
+ // Process each trace operation and output the analysis result to
+ // stdout/files.
+ Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
+ std::vector<uint32_t> cf_ids,
+ std::vector<Slice> keys,
+ std::vector<size_t> value_sizes);
+
+ Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
+ uint32_t cf_id, const Slice& key,
+ size_t value_size);
+
+ ROCKSDB_NAMESPACE::Env* env_;
+ EnvOptions env_options_;
+ std::unique_ptr<TraceReader> trace_reader_;
+ size_t offset_;
+ char buffer_[1024];
+ // Timestamp of a WriteBatch, used in its iteration.
+ uint64_t write_batch_ts_;
+ std::string trace_name_;
+ std::string output_path_;
+ AnalyzerOptions analyzer_opts_;
+ uint64_t total_requests_;
+ uint64_t total_access_keys_;
+ uint64_t total_gets_;
+ uint64_t total_writes_;
+ uint64_t total_seeks_;
+ uint64_t total_seek_prevs_;
+ uint64_t total_multigets_;
+ uint64_t trace_create_time_;
+ uint64_t begin_time_;
+ uint64_t end_time_;
+ uint64_t time_series_start_;
+ uint32_t sample_max_;
+ uint32_t cur_time_sec_;
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
+ trace_sequence_f_; // readable trace
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> qps_f_; // overall qps
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
+ cf_qps_f_; // The qps of each CF>
+ std::vector<TypeUnit> ta_; // The main statistic collecting data structure
+ std::map<uint32_t, CfUnit> cfs_; // All the cf_id appears in this trace;
+ std::vector<uint32_t> qps_peak_;
+ std::vector<double> qps_ave_;
+
+ Status ReadTraceHeader(Trace* header);
+ Status ReadTraceFooter(Trace* footer);
+ Status ReadTraceRecord(Trace* trace);
+ Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id,
+ const std::string& key, const size_t value_size,
+ const uint64_t ts);
+ Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type,
+ const uint64_t& ts, const std::string& key);
+ Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats);
+ Status CreateOutputFile(
+ const std::string& type, const std::string& cf_name,
+ const std::string& ending,
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr);
+ Status CloseOutputFiles();
+
+ void PrintStatistics();
+ Status TraceUnitWriter(
+ std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
+ Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
+ const Slice& key, const size_t value_size,
+ const uint64_t ts);
+ Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
+ Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
+ Status MakeStatisticQPS();
+ int db_version_;
+};
+
+int trace_analyzer_tool(int argc, char** argv);
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // ROCKSDB_LITE