summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/tools/trace_analyzer_tool.h
blob: 4b885b18cc10fef5953d8eac1d5dda0acf6f0922 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once
#ifndef ROCKSDB_LITE

#include <list>
#include <map>
#include <queue>
#include <set>
#include <utility>
#include <vector>

#include "rocksdb/env.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/write_batch.h"
#include "trace_replay/trace_replay.h"

namespace ROCKSDB_NAMESPACE {

// Value sizes may be used as denominators. Replacing 0 value sizes with this
// positive integer avoids division error.
extern const size_t kShadowValueSize /* = 10*/;

enum TraceOperationType : int {
  kGet = 0,
  kPut = 1,
  kDelete = 2,
  kSingleDelete = 3,
  kRangeDelete = 4,
  kMerge = 5,
  kIteratorSeek = 6,
  kIteratorSeekForPrev = 7,
  kMultiGet = 8,
  kTaTypeNum = 9
};

struct TraceUnit {
  uint64_t ts;
  uint32_t type;
  uint32_t cf_id;
  size_t value_size;
  std::string key;
};

struct TypeCorrelation {
  uint64_t count;
  uint64_t total_ts;
};

struct StatsUnit {
  uint64_t key_id;
  uint64_t access_count;
  uint64_t latest_ts;
  uint64_t succ_count;  // current only used to count Get if key found
  uint32_t cf_id;
  size_t value_size;
  std::vector<TypeCorrelation> v_correlation;
};

class AnalyzerOptions {
 public:
  std::vector<std::vector<int>> correlation_map;
  std::vector<std::pair<int, int>> correlation_list;

  AnalyzerOptions();

  ~AnalyzerOptions();

  void SparseCorrelationInput(const std::string& in_str);
};

// Note that, for the variable names  in the trace_analyzer,
// Starting with 'a_' means the variable is used for 'accessed_keys'.
// Starting with 'w_' means it is used for 'the whole key space'.
// Ending with '_f' means a file write or reader pointer.
// For example, 'a_count' means 'accessed_keys_count',
// 'w_key_f' means 'whole_key_space_file'.

struct TraceStats {
  uint32_t cf_id;
  std::string cf_name;
  uint64_t a_count;
  uint64_t a_succ_count;
  uint64_t a_key_id;
  uint64_t a_key_size_sqsum;
  uint64_t a_key_size_sum;
  uint64_t a_key_mid;
  uint64_t a_value_size_sqsum;
  uint64_t a_value_size_sum;
  uint64_t a_value_mid;
  uint32_t a_peak_qps;
  double a_ave_qps;
  std::map<std::string, StatsUnit> a_key_stats;
  std::map<uint64_t, uint64_t> a_count_stats;
  std::map<uint64_t, uint64_t> a_key_size_stats;
  std::map<uint64_t, uint64_t> a_value_size_stats;
  std::map<uint32_t, uint32_t> a_qps_stats;
  std::map<uint32_t, std::map<std::string, uint32_t>> a_qps_prefix_stats;
  std::priority_queue<std::pair<uint64_t, std::string>,
                      std::vector<std::pair<uint64_t, std::string>>,
                      std::greater<std::pair<uint64_t, std::string>>>
      top_k_queue;
  std::priority_queue<std::pair<uint64_t, std::string>,
                      std::vector<std::pair<uint64_t, std::string>>,
                      std::greater<std::pair<uint64_t, std::string>>>
      top_k_prefix_access;
  std::priority_queue<std::pair<double, std::string>,
                      std::vector<std::pair<double, std::string>>,
                      std::greater<std::pair<double, std::string>>>
      top_k_prefix_ave;
  std::priority_queue<std::pair<uint32_t, uint32_t>,
                      std::vector<std::pair<uint32_t, uint32_t>>,
                      std::greater<std::pair<uint32_t, uint32_t>>>
      top_k_qps_sec;
  std::list<TraceUnit> time_series;
  std::vector<std::pair<uint64_t, uint64_t>> correlation_output;
  std::map<uint32_t, uint64_t> uni_key_num;

  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> time_series_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_count_dist_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_prefix_cut_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_value_size_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_size_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_num_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_qps_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_top_qps_prefix_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_key_f;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_prefix_cut_f;

  TraceStats();
  ~TraceStats();
  TraceStats(const TraceStats&) = delete;
  TraceStats& operator=(const TraceStats&) = delete;
  TraceStats(TraceStats&&) = default;
  TraceStats& operator=(TraceStats&&) = default;
};

struct TypeUnit {
  std::string type_name;
  bool enabled;
  uint64_t total_keys;
  uint64_t total_access;
  uint64_t total_succ_access;
  uint32_t sample_count;
  std::map<uint32_t, TraceStats> stats;
  TypeUnit() = default;
  ~TypeUnit() = default;
  TypeUnit(const TypeUnit&) = delete;
  TypeUnit& operator=(const TypeUnit&) = delete;
  TypeUnit(TypeUnit&&) = default;
  TypeUnit& operator=(TypeUnit&&) = default;
};

struct CfUnit {
  uint32_t cf_id;
  uint64_t w_count;  // total keys in this cf if we use the whole key space
  uint64_t a_count;  // the total keys in this cf that are accessed
  std::map<uint64_t, uint64_t> w_key_size_stats;  // whole key space key size
                                                  // statistic this cf
  std::map<uint32_t, uint32_t> cf_qps;
};

class TraceAnalyzer : private TraceRecord::Handler,
                      private WriteBatch::Handler {
 public:
  TraceAnalyzer(std::string& trace_path, std::string& output_path,
                AnalyzerOptions _analyzer_opts);
  ~TraceAnalyzer();

  Status PrepareProcessing();

  Status StartProcessing();

  Status MakeStatistics();

  Status ReProcessing();

  Status EndProcessing();

  Status WriteTraceUnit(TraceUnit& unit);

  std::vector<TypeUnit>& GetTaVector() { return ta_; }

 private:
  using TraceRecord::Handler::Handle;
  Status Handle(const WriteQueryTraceRecord& record,
                std::unique_ptr<TraceRecordResult>* result) override;
  Status Handle(const GetQueryTraceRecord& record,
                std::unique_ptr<TraceRecordResult>* result) override;
  Status Handle(const IteratorSeekQueryTraceRecord& record,
                std::unique_ptr<TraceRecordResult>* result) override;
  Status Handle(const MultiGetQueryTraceRecord& record,
                std::unique_ptr<TraceRecordResult>* result) override;

  using WriteBatch::Handler::PutCF;
  Status PutCF(uint32_t column_family_id, const Slice& key,
               const Slice& value) override;

  using WriteBatch::Handler::DeleteCF;
  Status DeleteCF(uint32_t column_family_id, const Slice& key) override;

  using WriteBatch::Handler::SingleDeleteCF;
  Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override;

  using WriteBatch::Handler::DeleteRangeCF;
  Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
                       const Slice& end_key) override;

  using WriteBatch::Handler::MergeCF;
  Status MergeCF(uint32_t column_family_id, const Slice& key,
                 const Slice& value) override;

  // The following hanlders are not implemented, return Status::OK() to avoid
  // the running time assertion and other irrelevant falures.
  using WriteBatch::Handler::PutBlobIndexCF;
  Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
                        const Slice& /*value*/) override {
    return Status::OK();
  }

  // The default implementation of LogData does nothing.
  using WriteBatch::Handler::LogData;
  void LogData(const Slice& /*blob*/) override {}

  using WriteBatch::Handler::MarkBeginPrepare;
  Status MarkBeginPrepare(bool = false) override { return Status::OK(); }

  using WriteBatch::Handler::MarkEndPrepare;
  Status MarkEndPrepare(const Slice& /*xid*/) override { return Status::OK(); }

  using WriteBatch::Handler::MarkNoop;
  Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }

  using WriteBatch::Handler::MarkRollback;
  Status MarkRollback(const Slice& /*xid*/) override { return Status::OK(); }

  using WriteBatch::Handler::MarkCommit;
  Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); }

  using WriteBatch::Handler::MarkCommitWithTimestamp;
  Status MarkCommitWithTimestamp(const Slice& /*xid*/,
                                 const Slice& /*commit_ts*/) override {
    return Status::OK();
  }

  // Process each trace operation and output the analysis result to
  // stdout/files.
  Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
                              std::vector<uint32_t> cf_ids,
                              std::vector<Slice> keys,
                              std::vector<size_t> value_sizes);

  Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
                              uint32_t cf_id, const Slice& key,
                              size_t value_size);

  ROCKSDB_NAMESPACE::Env* env_;
  EnvOptions env_options_;
  std::unique_ptr<TraceReader> trace_reader_;
  size_t offset_;
  char buffer_[1024];
  // Timestamp of a WriteBatch, used in its iteration.
  uint64_t write_batch_ts_;
  std::string trace_name_;
  std::string output_path_;
  AnalyzerOptions analyzer_opts_;
  uint64_t total_requests_;
  uint64_t total_access_keys_;
  uint64_t total_gets_;
  uint64_t total_writes_;
  uint64_t total_seeks_;
  uint64_t total_seek_prevs_;
  uint64_t total_multigets_;
  uint64_t trace_create_time_;
  uint64_t begin_time_;
  uint64_t end_time_;
  uint64_t time_series_start_;
  uint32_t sample_max_;
  uint32_t cur_time_sec_;
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
      trace_sequence_f_;                                    // readable trace
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> qps_f_;  // overall qps
  std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
      cf_qps_f_;              // The qps of each CF>
  std::vector<TypeUnit> ta_;  // The main statistic collecting data structure
  std::map<uint32_t, CfUnit> cfs_;  // All the cf_id appears in this trace;
  std::vector<uint32_t> qps_peak_;
  std::vector<double> qps_ave_;

  Status ReadTraceHeader(Trace* header);
  Status ReadTraceFooter(Trace* footer);
  Status ReadTraceRecord(Trace* trace);
  Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id,
                           const std::string& key, const size_t value_size,
                           const uint64_t ts);
  Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type,
                                    const uint64_t& ts, const std::string& key);
  Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats);
  Status CreateOutputFile(
      const std::string& type, const std::string& cf_name,
      const std::string& ending,
      std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr);
  Status CloseOutputFiles();

  void PrintStatistics();
  Status TraceUnitWriter(
      std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
  Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
                            const Slice& key, const size_t value_size,
                            const uint64_t ts);
  Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
  Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
  Status MakeStatisticQPS();
  int db_version_;
};

int trace_analyzer_tool(int argc, char** argv);

}  // namespace ROCKSDB_NAMESPACE

#endif  // ROCKSDB_LITE