diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/tools/trace_analyzer_tool.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/tools/trace_analyzer_tool.cc | 1985 |
1 files changed, 1985 insertions, 0 deletions
diff --git a/src/rocksdb/tools/trace_analyzer_tool.cc b/src/rocksdb/tools/trace_analyzer_tool.cc new file mode 100644 index 00000000..a0186925 --- /dev/null +++ b/src/rocksdb/tools/trace_analyzer_tool.cc @@ -0,0 +1,1985 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#ifdef GFLAGS +#ifdef NUMA +#include <numa.h> +#include <numaif.h> +#endif +#ifndef OS_WIN +#include <unistd.h> +#endif + +#include <cinttypes> +#include <cmath> +#include <cstdio> +#include <cstdlib> +#include <memory> +#include <sstream> +#include <stdexcept> + +#include "db/db_impl.h" +#include "db/memtable.h" +#include "db/write_batch_internal.h" +#include "options/cf_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/status.h" +#include "rocksdb/table_properties.h" +#include "rocksdb/utilities/ldb_cmd.h" +#include "rocksdb/write_batch.h" +#include "table/meta_blocks.h" +#include "table/plain_table_factory.h" +#include "table/table_reader.h" +#include "tools/trace_analyzer_tool.h" +#include "util/coding.h" +#include "util/compression.h" +#include "util/file_reader_writer.h" +#include "util/gflags_compat.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/trace_replay.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; +using GFLAGS_NAMESPACE::RegisterFlagValidator; +using GFLAGS_NAMESPACE::SetUsageMessage; + +DEFINE_string(trace_path, "", "The trace file path."); +DEFINE_string(output_dir, "", "The directory to store the output files."); +DEFINE_string(output_prefix, "trace", + "The prefix used for all the output files."); +DEFINE_bool(output_key_stats, false, + "Output the key access count statistics to file\n" + "for accessed keys:\n" + "file name: <prefix>-<query_type>-<cf_id>-accessed_key_stats.txt\n" + "Format:[cf_id value_size access_keyid access_count]\n" + "for the whole key space keys:\n" + "File name: <prefix>-<query_type>-<cf_id>-whole_key_stats.txt\n" + "Format:[whole_key_space_keyid access_count]"); +DEFINE_bool(output_access_count_stats, false, + "Output the access count distribution statistics to file.\n" + "File name: <prefix>-<query_type>-<cf_id>-accessed_" + "key_count_distribution.txt \n" + "Format:[access_count number_of_access_count]"); +DEFINE_bool(output_time_series, false, + "Output the access time in second of each key, " + "such that we can have the time series data of the queries \n" + "File name: <prefix>-<query_type>-<cf_id>-time_series.txt\n" + "Format:[type_id time_in_sec access_keyid]."); +DEFINE_bool(try_process_corrupted_trace, false, + "In default, trace_analyzer will exit if the trace file is " + "corrupted due to the unexpected tracing cases. If this option " + "is enabled, trace_analyzer will stop reading the trace file, " + "and start analyzing the read-in data."); +DEFINE_int32(output_prefix_cut, 0, + "The number of bytes as prefix to cut the keys.\n" + "If it is enabled, it will generate the following:\n" + "For accessed keys:\n" + "File name: <prefix>-<query_type>-<cf_id>-" + "accessed_key_prefix_cut.txt \n" + "Format:[acessed_keyid access_count_of_prefix " + "number_of_keys_in_prefix average_key_access " + "prefix_succ_ratio prefix]\n" + "For whole key space keys:\n" + "File name: <prefix>-<query_type>-<cf_id>" + "-whole_key_prefix_cut.txt\n" + "Format:[start_keyid_in_whole_keyspace prefix]\n" + "if 'output_qps_stats' and 'top_k' are enabled, it will output:\n" + "File name: <prefix>-<query_type>-<cf_id>" + "-accessed_top_k_qps_prefix_cut.txt\n" + "Format:[the_top_ith_qps_time QPS], [prefix qps_of_this_second]."); +DEFINE_bool(convert_to_human_readable_trace, false, + "Convert the binary trace file to a human readable txt file " + "for further processing. " + "This file will be extremely large " + "(similar size as the original binary trace file). " + "You can specify 'no_key' to reduce the size, if key is not " + "needed in the next step.\n" + "File name: <prefix>_human_readable_trace.txt\n" + "Format:[type_id cf_id value_size time_in_micorsec <key>]."); +DEFINE_bool(output_qps_stats, false, + "Output the query per second(qps) statistics \n" + "For the overall qps, it will contain all qps of each query type. " + "The time is started from the first trace record\n" + "File name: <prefix>_qps_stats.txt\n" + "Format: [qps_type_1 qps_type_2 ...... overall_qps]\n" + "For each cf and query, it will have its own qps output.\n" + "File name: <prefix>-<query_type>-<cf_id>_qps_stats.txt \n" + "Format:[query_count_in_this_second]."); +DEFINE_bool(no_print, false, "Do not print out any result"); +DEFINE_string( + print_correlation, "", + "intput format: [correlation pairs][.,.]\n" + "Output the query correlations between the pairs of query types " + "listed in the parameter, input should select the operations from:\n" + "get, put, delete, single_delete, rangle_delete, merge. No space " + "between the pairs separated by commar. Example: =[get,get]... " + "It will print out the number of pairs of 'A after B' and " + "the average time interval between the two query."); +DEFINE_string(key_space_dir, "", + "<the directory stores full key space files> \n" + "The key space files should be: <column family id>.txt"); +DEFINE_bool(analyze_get, false, "Analyze the Get query."); +DEFINE_bool(analyze_put, false, "Analyze the Put query."); +DEFINE_bool(analyze_delete, false, "Analyze the Delete query."); +DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query."); +DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query."); +DEFINE_bool(analyze_merge, false, "Analyze the Merge query."); +DEFINE_bool(analyze_iterator, false, + " Analyze the iterate query like seek() and seekForPrev()."); +DEFINE_bool(no_key, false, + " Does not output the key to the result files to make smaller."); +DEFINE_bool(print_overall_stats, true, + " Print the stats of the whole trace, " + "like total requests, keys, and etc."); +DEFINE_bool(output_key_distribution, false, "Print the key size distribution."); +DEFINE_bool( + output_value_distribution, false, + "Out put the value size distribution, only available for Put and Merge.\n" + "File name: <prefix>-<query_type>-<cf_id>" + "-accessed_value_size_distribution.txt\n" + "Format:[Number_of_value_size_between x and " + "x+value_interval is: <the count>]"); +DEFINE_int32(print_top_k_access, 1, + "<top K of the variables to be printed> " + "Print the top k accessed keys, top k accessed prefix " + "and etc."); +DEFINE_int32(output_ignore_count, 0, + "<threshold>, ignores the access count <= this value, " + "it will shorter the output."); +DEFINE_int32(value_interval, 8, + "To output the value distribution, we need to set the value " + "intervals and make the statistic of the value size distribution " + "in different intervals. The default is 8."); +DEFINE_double(sample_ratio, 1.0, + "If the trace size is extremely huge or user want to sample " + "the trace when analyzing, sample ratio can be set (0, 1.0]"); + +namespace rocksdb { + +std::map<std::string, int> taOptToIndex = { + {"get", 0}, {"put", 1}, + {"delete", 2}, {"single_delete", 3}, + {"range_delete", 4}, {"merge", 5}, + {"iterator_Seek", 6}, {"iterator_SeekForPrev", 7}}; + +std::map<int, std::string> taIndexToOpt = { + {0, "get"}, {1, "put"}, + {2, "delete"}, {3, "single_delete"}, + {4, "range_delete"}, {5, "merge"}, + {6, "iterator_Seek"}, {7, "iterator_SeekForPrev"}}; + +namespace { + +uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) { + if (op1 == 0 || op2 == 0) { + return 0; + } + if (port::kMaxUint64 / op1 < op2) { + return op1; + } + return (op1 * op2); +} + +void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) { + Slice buf(buffer); + GetFixed32(&buf, cf_id); + GetLengthPrefixedSlice(&buf, key); +} + +} // namespace + +// The default constructor of AnalyzerOptions +AnalyzerOptions::AnalyzerOptions() + : correlation_map(kTaTypeNum, std::vector<int>(kTaTypeNum, -1)) {} + +AnalyzerOptions::~AnalyzerOptions() {} + +void AnalyzerOptions::SparseCorrelationInput(const std::string& in_str) { + std::string cur = in_str; + if (cur.size() == 0) { + return; + } + while (!cur.empty()) { + if (cur.compare(0, 1, "[") != 0) { + fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str()); + exit(1); + } + std::string opt1, opt2; + std::size_t split = cur.find_first_of(","); + if (split != std::string::npos) { + opt1 = cur.substr(1, split - 1); + } else { + fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str()); + exit(1); + } + std::size_t end = cur.find_first_of("]"); + if (end != std::string::npos) { + opt2 = cur.substr(split + 1, end - split - 1); + } else { + fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str()); + exit(1); + } + cur = cur.substr(end + 1); + + if (taOptToIndex.find(opt1) != taOptToIndex.end() && + taOptToIndex.find(opt2) != taOptToIndex.end()) { + correlation_list.push_back( + std::make_pair(taOptToIndex[opt1], taOptToIndex[opt2])); + } else { + fprintf(stderr, "Invalid correlation input: %s\n", in_str.c_str()); + exit(1); + } + } + + int sequence = 0; + for (auto& it : correlation_list) { + correlation_map[it.first][it.second] = sequence; + sequence++; + } + return; +} + +// The trace statistic struct constructor +TraceStats::TraceStats() { + cf_id = 0; + cf_name = "0"; + a_count = 0; + a_key_id = 0; + a_key_size_sqsum = 0; + a_key_size_sum = 0; + a_key_mid = 0; + a_value_size_sqsum = 0; + a_value_size_sum = 0; + a_value_mid = 0; + a_peak_qps = 0; + a_ave_qps = 0.0; +} + +TraceStats::~TraceStats() {} + +// The trace analyzer constructor +TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path, + AnalyzerOptions _analyzer_opts) + : trace_name_(trace_path), + output_path_(output_path), + analyzer_opts_(_analyzer_opts) { + rocksdb::EnvOptions env_options; + env_ = rocksdb::Env::Default(); + offset_ = 0; + c_time_ = 0; + total_requests_ = 0; + total_access_keys_ = 0; + total_gets_ = 0; + total_writes_ = 0; + trace_create_time_ = 0; + begin_time_ = 0; + end_time_ = 0; + time_series_start_ = 0; + cur_time_sec_ = 0; + if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) { + sample_max_ = 1; + } else { + sample_max_ = static_cast<uint32_t>(1.0 / FLAGS_sample_ratio); + } + + ta_.resize(kTaTypeNum); + ta_[0].type_name = "get"; + if (FLAGS_analyze_get) { + ta_[0].enabled = true; + } else { + ta_[0].enabled = false; + } + ta_[1].type_name = "put"; + if (FLAGS_analyze_put) { + ta_[1].enabled = true; + } else { + ta_[1].enabled = false; + } + ta_[2].type_name = "delete"; + if (FLAGS_analyze_delete) { + ta_[2].enabled = true; + } else { + ta_[2].enabled = false; + } + ta_[3].type_name = "single_delete"; + if (FLAGS_analyze_single_delete) { + ta_[3].enabled = true; + } else { + ta_[3].enabled = false; + } + ta_[4].type_name = "range_delete"; + if (FLAGS_analyze_range_delete) { + ta_[4].enabled = true; + } else { + ta_[4].enabled = false; + } + ta_[5].type_name = "merge"; + if (FLAGS_analyze_merge) { + ta_[5].enabled = true; + } else { + ta_[5].enabled = false; + } + ta_[6].type_name = "iterator_Seek"; + if (FLAGS_analyze_iterator) { + ta_[6].enabled = true; + } else { + ta_[6].enabled = false; + } + ta_[7].type_name = "iterator_SeekForPrev"; + if (FLAGS_analyze_iterator) { + ta_[7].enabled = true; + } else { + ta_[7].enabled = false; + } + for (int i = 0; i < kTaTypeNum; i++) { + ta_[i].sample_count = 0; + } +} + +TraceAnalyzer::~TraceAnalyzer() {} + +// Prepare the processing +// Initiate the global trace reader and writer here +Status TraceAnalyzer::PrepareProcessing() { + Status s; + // Prepare the trace reader + s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_); + if (!s.ok()) { + return s; + } + + // Prepare and open the trace sequence file writer if needed + if (FLAGS_convert_to_human_readable_trace) { + std::string trace_sequence_name; + trace_sequence_name = + output_path_ + "/" + FLAGS_output_prefix + "-human_readable_trace.txt"; + s = env_->NewWritableFile(trace_sequence_name, &trace_sequence_f_, + env_options_); + if (!s.ok()) { + return s; + } + } + + // prepare the general QPS file writer + if (FLAGS_output_qps_stats) { + std::string qps_stats_name; + qps_stats_name = + output_path_ + "/" + FLAGS_output_prefix + "-qps_stats.txt"; + s = env_->NewWritableFile(qps_stats_name, &qps_f_, env_options_); + if (!s.ok()) { + return s; + } + + qps_stats_name = + output_path_ + "/" + FLAGS_output_prefix + "-cf_qps_stats.txt"; + s = env_->NewWritableFile(qps_stats_name, &cf_qps_f_, env_options_); + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + +Status TraceAnalyzer::ReadTraceHeader(Trace* header) { + assert(header != nullptr); + Status s = ReadTraceRecord(header); + if (!s.ok()) { + return s; + } + if (header->type != kTraceBegin) { + return Status::Corruption("Corrupted trace file. Incorrect header."); + } + if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { + return Status::Corruption("Corrupted trace file. Incorrect magic."); + } + + return s; +} + +Status TraceAnalyzer::ReadTraceFooter(Trace* footer) { + assert(footer != nullptr); + Status s = ReadTraceRecord(footer); + if (!s.ok()) { + return s; + } + if (footer->type != kTraceEnd) { + return Status::Corruption("Corrupted trace file. Incorrect footer."); + } + return s; +} + +Status TraceAnalyzer::ReadTraceRecord(Trace* trace) { + assert(trace != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + + Slice enc_slice = Slice(encoded_trace); + GetFixed64(&enc_slice, &trace->ts); + trace->type = static_cast<TraceType>(enc_slice[0]); + enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize); + trace->payload = enc_slice.ToString(); + return s; +} + +// process the trace itself and redirect the trace content +// to different operation type handler. With different race +// format, this function can be changed +Status TraceAnalyzer::StartProcessing() { + Status s; + Trace header; + s = ReadTraceHeader(&header); + if (!s.ok()) { + fprintf(stderr, "Cannot read the header\n"); + return s; + } + trace_create_time_ = header.ts; + if (FLAGS_output_time_series) { + time_series_start_ = header.ts; + } + + Trace trace; + while (s.ok()) { + trace.reset(); + s = ReadTraceRecord(&trace); + if (!s.ok()) { + break; + } + + total_requests_++; + end_time_ = trace.ts; + if (trace.type == kTraceWrite) { + total_writes_++; + c_time_ = trace.ts; + WriteBatch batch(trace.payload); + + // Note that, if the write happens in a transaction, + // 'Write' will be called twice, one for Prepare, one for + // Commit. Thus, in the trace, for the same WriteBatch, there + // will be two reords if it is in a transaction. Here, we only + // process the reord that is committed. If write is non-transaction, + // HasBeginPrepare()==false, so we process it normally. + if (batch.HasBeginPrepare() && !batch.HasCommit()) { + continue; + } + TraceWriteHandler write_handler(this); + s = batch.Iterate(&write_handler); + if (!s.ok()) { + fprintf(stderr, "Cannot process the write batch in the trace\n"); + return s; + } + } else if (trace.type == kTraceGet) { + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKeyFromString(trace.payload, &cf_id, &key); + total_gets_++; + + s = HandleGet(cf_id, key.ToString(), trace.ts, 1); + if (!s.ok()) { + fprintf(stderr, "Cannot process the get in the trace\n"); + return s; + } + } else if (trace.type == kTraceIteratorSeek || + trace.type == kTraceIteratorSeekForPrev) { + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKeyFromString(trace.payload, &cf_id, &key); + s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type); + if (!s.ok()) { + fprintf(stderr, "Cannot process the iterator in the trace\n"); + return s; + } + } else if (trace.type == kTraceEnd) { + break; + } + } + if (s.IsIncomplete()) { + // Fix it: Reaching eof returns Incomplete status at the moment. + // + return Status::OK(); + } + return s; +} + +// After the trace is processed by StartProcessing, the statistic data +// is stored in the map or other in memory data structures. To get the +// other statistic result such as key size distribution, value size +// distribution, these data structures are re-processed here. +Status TraceAnalyzer::MakeStatistics() { + int ret; + Status s; + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled) { + continue; + } + for (auto& stat : ta_[type].stats) { + stat.second.a_key_id = 0; + for (auto& record : stat.second.a_key_stats) { + record.second.key_id = stat.second.a_key_id; + stat.second.a_key_id++; + if (record.second.access_count <= + static_cast<uint64_t>(FLAGS_output_ignore_count)) { + continue; + } + + // Generate the key access count distribution data + if (FLAGS_output_access_count_stats) { + if (stat.second.a_count_stats.find(record.second.access_count) == + stat.second.a_count_stats.end()) { + stat.second.a_count_stats[record.second.access_count] = 1; + } else { + stat.second.a_count_stats[record.second.access_count]++; + } + } + + // Generate the key size distribution data + if (FLAGS_output_key_distribution) { + if (stat.second.a_key_size_stats.find(record.first.size()) == + stat.second.a_key_size_stats.end()) { + stat.second.a_key_size_stats[record.first.size()] = 1; + } else { + stat.second.a_key_size_stats[record.first.size()]++; + } + } + + if (!FLAGS_print_correlation.empty()) { + s = MakeStatisticCorrelation(stat.second, record.second); + if (!s.ok()) { + return s; + } + } + } + + // Output the prefix cut or the whole content of the accessed key space + if (FLAGS_output_key_stats || FLAGS_output_prefix_cut > 0) { + s = MakeStatisticKeyStatsOrPrefix(stat.second); + if (!s.ok()) { + return s; + } + } + + // output the access count distribution + if (FLAGS_output_access_count_stats && stat.second.a_count_dist_f) { + for (auto& record : stat.second.a_count_stats) { + ret = sprintf(buffer_, "access_count: %" PRIu64 " num: %" PRIu64 "\n", + record.first, record.second); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.second.a_count_dist_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write access count distribution file failed\n"); + return s; + } + } + } + + // find the medium of the key size + uint64_t k_count = 0; + bool get_mid = false; + for (auto& record : stat.second.a_key_size_stats) { + k_count += record.second; + if (!get_mid && k_count >= stat.second.a_key_mid) { + stat.second.a_key_mid = record.first; + get_mid = true; + } + if (FLAGS_output_key_distribution && stat.second.a_key_size_f) { + ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 "\n", record.first, + record.second); + if (ret < 0) { + return Status::IOError("Format output failed"); + } + std::string printout(buffer_); + s = stat.second.a_key_size_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write key size distribution file failed\n"); + return s; + } + } + } + + // output the value size distribution + uint64_t v_begin = 0, v_end = 0, v_count = 0; + get_mid = false; + for (auto& record : stat.second.a_value_size_stats) { + v_begin = v_end; + v_end = (record.first + 1) * FLAGS_value_interval; + v_count += record.second; + if (!get_mid && v_count >= stat.second.a_count / 2) { + stat.second.a_value_mid = (v_begin + v_end) / 2; + get_mid = true; + } + if (FLAGS_output_value_distribution && stat.second.a_value_size_f && + (type == TraceOperationType::kPut || + type == TraceOperationType::kMerge)) { + ret = sprintf(buffer_, + "Number_of_value_size_between %" PRIu64 " and %" PRIu64 + " is: %" PRIu64 "\n", + v_begin, v_end, record.second); + if (ret < 0) { + return Status::IOError("Format output failed"); + } + std::string printout(buffer_); + s = stat.second.a_value_size_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write value size distribution file failed\n"); + return s; + } + } + } + } + } + + // Make the QPS statistics + if (FLAGS_output_qps_stats) { + s = MakeStatisticQPS(); + if (!s.ok()) { + return s; + } + } + + return Status::OK(); +} + +// Process the statistics of the key access and +// prefix of the accessed keys if required +Status TraceAnalyzer::MakeStatisticKeyStatsOrPrefix(TraceStats& stats) { + int ret; + Status s; + std::string prefix = "0"; + uint64_t prefix_access = 0; + uint64_t prefix_count = 0; + uint64_t prefix_succ_access = 0; + double prefix_ave_access = 0.0; + stats.a_succ_count = 0; + for (auto& record : stats.a_key_stats) { + // write the key access statistic file + if (!stats.a_key_f) { + return Status::IOError("Failed to open accessed_key_stats file."); + } + stats.a_succ_count += record.second.succ_count; + double succ_ratio = 0.0; + if (record.second.access_count > 0) { + succ_ratio = (static_cast<double>(record.second.succ_count)) / + record.second.access_count; + } + ret = sprintf(buffer_, "%u %zu %" PRIu64 " %" PRIu64 " %f\n", + record.second.cf_id, record.second.value_size, + record.second.key_id, record.second.access_count, succ_ratio); + if (ret < 0) { + return Status::IOError("Format output failed"); + } + std::string printout(buffer_); + s = stats.a_key_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write key access file failed\n"); + return s; + } + + // write the prefix cut of the accessed keys + if (FLAGS_output_prefix_cut > 0 && stats.a_prefix_cut_f) { + if (record.first.compare(0, FLAGS_output_prefix_cut, prefix) != 0) { + std::string prefix_out = rocksdb::LDBCommand::StringToHex(prefix); + if (prefix_count == 0) { + prefix_ave_access = 0.0; + } else { + prefix_ave_access = + (static_cast<double>(prefix_access)) / prefix_count; + } + double prefix_succ_ratio = 0.0; + if (prefix_access > 0) { + prefix_succ_ratio = + (static_cast<double>(prefix_succ_access)) / prefix_access; + } + ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 " %" PRIu64 " %f %f %s\n", + record.second.key_id, prefix_access, prefix_count, + prefix_ave_access, prefix_succ_ratio, prefix_out.c_str()); + if (ret < 0) { + return Status::IOError("Format output failed"); + } + std::string pout(buffer_); + s = stats.a_prefix_cut_f->Append(pout); + if (!s.ok()) { + fprintf(stderr, "Write accessed key prefix file failed\n"); + return s; + } + + // make the top k statistic for the prefix + if (static_cast<int32_t>(stats.top_k_prefix_access.size()) < + FLAGS_print_top_k_access) { + stats.top_k_prefix_access.push( + std::make_pair(prefix_access, prefix_out)); + } else { + if (prefix_access > stats.top_k_prefix_access.top().first) { + stats.top_k_prefix_access.pop(); + stats.top_k_prefix_access.push( + std::make_pair(prefix_access, prefix_out)); + } + } + + if (static_cast<int32_t>(stats.top_k_prefix_ave.size()) < + FLAGS_print_top_k_access) { + stats.top_k_prefix_ave.push( + std::make_pair(prefix_ave_access, prefix_out)); + } else { + if (prefix_ave_access > stats.top_k_prefix_ave.top().first) { + stats.top_k_prefix_ave.pop(); + stats.top_k_prefix_ave.push( + std::make_pair(prefix_ave_access, prefix_out)); + } + } + + prefix = record.first.substr(0, FLAGS_output_prefix_cut); + prefix_access = 0; + prefix_count = 0; + prefix_succ_access = 0; + } + prefix_access += record.second.access_count; + prefix_count += 1; + prefix_succ_access += record.second.succ_count; + } + } + return Status::OK(); +} + +// Process the statistics of different query type +// correlations +Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats, + StatsUnit& unit) { + if (stats.correlation_output.size() != + analyzer_opts_.correlation_list.size()) { + return Status::Corruption("Cannot make the statistic of correlation."); + } + + for (int i = 0; i < static_cast<int>(analyzer_opts_.correlation_list.size()); + i++) { + if (i >= static_cast<int>(stats.correlation_output.size()) || + i >= static_cast<int>(unit.v_correlation.size())) { + break; + } + stats.correlation_output[i].first += unit.v_correlation[i].count; + stats.correlation_output[i].second += unit.v_correlation[i].total_ts; + } + return Status::OK(); +} + +// Process the statistics of QPS +Status TraceAnalyzer::MakeStatisticQPS() { + if(begin_time_ == 0) { + begin_time_ = trace_create_time_; + } + uint32_t duration = + static_cast<uint32_t>((end_time_ - begin_time_) / 1000000); + int ret; + Status s; + std::vector<std::vector<uint32_t>> type_qps( + duration, std::vector<uint32_t>(kTaTypeNum + 1, 0)); + std::vector<uint64_t> qps_sum(kTaTypeNum + 1, 0); + std::vector<uint32_t> qps_peak(kTaTypeNum + 1, 0); + qps_ave_.resize(kTaTypeNum + 1); + + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled) { + continue; + } + for (auto& stat : ta_[type].stats) { + uint32_t time_line = 0; + uint64_t cf_qps_sum = 0; + for (auto& time_it : stat.second.a_qps_stats) { + if (time_it.first >= duration) { + continue; + } + type_qps[time_it.first][kTaTypeNum] += time_it.second; + type_qps[time_it.first][type] += time_it.second; + cf_qps_sum += time_it.second; + if (time_it.second > stat.second.a_peak_qps) { + stat.second.a_peak_qps = time_it.second; + } + if (stat.second.a_qps_f) { + while (time_line < time_it.first) { + ret = sprintf(buffer_, "%u\n", 0); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.second.a_qps_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write QPS file failed\n"); + return s; + } + time_line++; + } + ret = sprintf(buffer_, "%u\n", time_it.second); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.second.a_qps_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write QPS file failed\n"); + return s; + } + if (time_line == time_it.first) { + time_line++; + } + } + + // Process the top k QPS peaks + if (FLAGS_output_prefix_cut > 0) { + if (static_cast<int32_t>(stat.second.top_k_qps_sec.size()) < + FLAGS_print_top_k_access) { + stat.second.top_k_qps_sec.push( + std::make_pair(time_it.second, time_it.first)); + } else { + if (stat.second.top_k_qps_sec.size() > 0 && + stat.second.top_k_qps_sec.top().first < time_it.second) { + stat.second.top_k_qps_sec.pop(); + stat.second.top_k_qps_sec.push( + std::make_pair(time_it.second, time_it.first)); + } + } + } + } + if (duration == 0) { + stat.second.a_ave_qps = 0; + } else { + stat.second.a_ave_qps = (static_cast<double>(cf_qps_sum)) / duration; + } + + // Output the accessed unique key number change overtime + if (stat.second.a_key_num_f) { + uint64_t cur_uni_key = + static_cast<uint64_t>(stat.second.a_key_stats.size()); + double cur_ratio = 0.0; + uint64_t cur_num = 0; + for (uint32_t i = 0; i < duration; i++) { + auto find_time = stat.second.uni_key_num.find(i); + if (find_time != stat.second.uni_key_num.end()) { + cur_ratio = (static_cast<double>(find_time->second)) / cur_uni_key; + cur_num = find_time->second; + } + ret = sprintf(buffer_, "%" PRIu64 " %.12f\n", cur_num, cur_ratio); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.second.a_key_num_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, + "Write accessed unique key number change file failed\n"); + return s; + } + } + } + + // output the prefix of top k access peak + if (FLAGS_output_prefix_cut > 0 && stat.second.a_top_qps_prefix_f) { + while (!stat.second.top_k_qps_sec.empty()) { + ret = sprintf(buffer_, "At time: %u with QPS: %u\n", + stat.second.top_k_qps_sec.top().second, + stat.second.top_k_qps_sec.top().first); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.second.a_top_qps_prefix_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write prefix QPS top K file failed\n"); + return s; + } + uint32_t qps_time = stat.second.top_k_qps_sec.top().second; + stat.second.top_k_qps_sec.pop(); + if (stat.second.a_qps_prefix_stats.find(qps_time) != + stat.second.a_qps_prefix_stats.end()) { + for (auto& qps_prefix : stat.second.a_qps_prefix_stats[qps_time]) { + std::string qps_prefix_out = + rocksdb::LDBCommand::StringToHex(qps_prefix.first); + ret = sprintf(buffer_, "The prefix: %s Access count: %u\n", + qps_prefix_out.c_str(), qps_prefix.second); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string pout(buffer_); + s = stat.second.a_top_qps_prefix_f->Append(pout); + if (!s.ok()) { + fprintf(stderr, "Write prefix QPS top K file failed\n"); + return s; + } + } + } + } + } + } + } + + if (qps_f_) { + for (uint32_t i = 0; i < duration; i++) { + for (int type = 0; type <= kTaTypeNum; type++) { + if (type < kTaTypeNum) { + ret = sprintf(buffer_, "%u ", type_qps[i][type]); + } else { + ret = sprintf(buffer_, "%u\n", type_qps[i][type]); + } + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = qps_f_->Append(printout); + if (!s.ok()) { + return s; + } + qps_sum[type] += type_qps[i][type]; + if (type_qps[i][type] > qps_peak[type]) { + qps_peak[type] = type_qps[i][type]; + } + } + } + } + + if (cf_qps_f_) { + int cfs_size = static_cast<uint32_t>(cfs_.size()); + uint32_t v; + for (uint32_t i = 0; i < duration; i++) { + for (int cf = 0; cf < cfs_size; cf++) { + if (cfs_[cf].cf_qps.find(i) != cfs_[cf].cf_qps.end()) { + v = cfs_[cf].cf_qps[i]; + } else { + v = 0; + } + if (cf < cfs_size - 1) { + ret = sprintf(buffer_, "%u ", v); + } else { + ret = sprintf(buffer_, "%u\n", v); + } + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = cf_qps_f_->Append(printout); + if (!s.ok()) { + return s; + } + } + } + } + + qps_peak_ = qps_peak; + for (int type = 0; type <= kTaTypeNum; type++) { + if (duration == 0) { + qps_ave_[type] = 0; + } else { + qps_ave_[type] = (static_cast<double>(qps_sum[type])) / duration; + } + } + + return Status::OK(); +} + +// In reprocessing, if we have the whole key space +// we can output the access count of all keys in a cf +// we can make some statistics of the whole key space +// also, we output the top k accessed keys here +Status TraceAnalyzer::ReProcessing() { + int ret; + Status s; + for (auto& cf_it : cfs_) { + uint32_t cf_id = cf_it.first; + + // output the time series; + if (FLAGS_output_time_series) { + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled || + ta_[type].stats.find(cf_id) == ta_[type].stats.end()) { + continue; + } + TraceStats& stat = ta_[type].stats[cf_id]; + if (!stat.time_series_f) { + fprintf(stderr, "Cannot write time_series of '%s' in '%u'\n", + ta_[type].type_name.c_str(), cf_id); + continue; + } + while (!stat.time_series.empty()) { + uint64_t key_id = 0; + auto found = stat.a_key_stats.find(stat.time_series.front().key); + if (found != stat.a_key_stats.end()) { + key_id = found->second.key_id; + } + ret = sprintf(buffer_, "%u %" PRIu64 " %" PRIu64 "\n", + stat.time_series.front().type, + stat.time_series.front().ts, key_id); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.time_series_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write time series file failed\n"); + return s; + } + stat.time_series.pop_front(); + } + } + } + + // process the whole key space if needed + if (!FLAGS_key_space_dir.empty()) { + std::string whole_key_path = + FLAGS_key_space_dir + "/" + std::to_string(cf_id) + ".txt"; + std::string input_key, get_key; + std::vector<std::string> prefix(kTaTypeNum); + std::istringstream iss; + bool has_data = true; + s = env_->NewSequentialFile(whole_key_path, &wkey_input_f_, env_options_); + if (!s.ok()) { + fprintf(stderr, "Cannot open the whole key space file of CF: %u\n", + cf_id); + wkey_input_f_.reset(); + } + if (wkey_input_f_) { + for (cfs_[cf_id].w_count = 0; + ReadOneLine(&iss, wkey_input_f_.get(), &get_key, &has_data, &s); + ++cfs_[cf_id].w_count) { + if (!s.ok()) { + fprintf(stderr, "Read whole key space file failed\n"); + return s; + } + + input_key = rocksdb::LDBCommand::HexToString(get_key); + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled) { + continue; + } + TraceStats& stat = ta_[type].stats[cf_id]; + if (stat.w_key_f) { + if (stat.a_key_stats.find(input_key) != stat.a_key_stats.end()) { + ret = sprintf(buffer_, "%" PRIu64 " %" PRIu64 "\n", + cfs_[cf_id].w_count, + stat.a_key_stats[input_key].access_count); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.w_key_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, "Write whole key space access file failed\n"); + return s; + } + } + } + + // Output the prefix cut file of the whole key space + if (FLAGS_output_prefix_cut > 0 && stat.w_prefix_cut_f) { + if (input_key.compare(0, FLAGS_output_prefix_cut, prefix[type]) != + 0) { + prefix[type] = input_key.substr(0, FLAGS_output_prefix_cut); + std::string prefix_out = + rocksdb::LDBCommand::StringToHex(prefix[type]); + ret = sprintf(buffer_, "%" PRIu64 " %s\n", cfs_[cf_id].w_count, + prefix_out.c_str()); + if (ret < 0) { + return Status::IOError("Format the output failed"); + } + std::string printout(buffer_); + s = stat.w_prefix_cut_f->Append(printout); + if (!s.ok()) { + fprintf(stderr, + "Write whole key space prefix cut file failed\n"); + return s; + } + } + } + } + + // Make the statistics fo the key size distribution + if (FLAGS_output_key_distribution) { + if (cfs_[cf_id].w_key_size_stats.find(input_key.size()) == + cfs_[cf_id].w_key_size_stats.end()) { + cfs_[cf_id].w_key_size_stats[input_key.size()] = 1; + } else { + cfs_[cf_id].w_key_size_stats[input_key.size()]++; + } + } + } + } + } + + // process the top k accessed keys + if (FLAGS_print_top_k_access > 0) { + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled || + ta_[type].stats.find(cf_id) == ta_[type].stats.end()) { + continue; + } + TraceStats& stat = ta_[type].stats[cf_id]; + for (auto& record : stat.a_key_stats) { + if (static_cast<int32_t>(stat.top_k_queue.size()) < + FLAGS_print_top_k_access) { + stat.top_k_queue.push( + std::make_pair(record.second.access_count, record.first)); + } else { + if (record.second.access_count > stat.top_k_queue.top().first) { + stat.top_k_queue.pop(); + stat.top_k_queue.push( + std::make_pair(record.second.access_count, record.first)); + } + } + } + } + } + } + return Status::OK(); +} + +// End the processing, print the requested results +Status TraceAnalyzer::EndProcessing() { + if (trace_sequence_f_) { + trace_sequence_f_->Close(); + } + if (FLAGS_no_print) { + return Status::OK(); + } + PrintStatistics(); + CloseOutputFiles(); + return Status::OK(); +} + +// Insert the corresponding key statistics to the correct type +// and correct CF, output the time-series file if needed +Status TraceAnalyzer::KeyStatsInsertion(const uint32_t& type, + const uint32_t& cf_id, + const std::string& key, + const size_t value_size, + const uint64_t ts) { + Status s; + StatsUnit unit; + unit.key_id = 0; + unit.cf_id = cf_id; + unit.value_size = value_size; + unit.access_count = 1; + unit.latest_ts = ts; + if (type != TraceOperationType::kGet || value_size > 0) { + unit.succ_count = 1; + } else { + unit.succ_count = 0; + } + unit.v_correlation.resize(analyzer_opts_.correlation_list.size()); + for (int i = 0; + i < (static_cast<int>(analyzer_opts_.correlation_list.size())); i++) { + unit.v_correlation[i].count = 0; + unit.v_correlation[i].total_ts = 0; + } + std::string prefix; + if (FLAGS_output_prefix_cut > 0) { + prefix = key.substr(0, FLAGS_output_prefix_cut); + } + + if (begin_time_ == 0) { + begin_time_ = ts; + } + uint32_t time_in_sec; + if (ts < begin_time_) { + time_in_sec = 0; + } else { + time_in_sec = static_cast<uint32_t>((ts - begin_time_) / 1000000); + } + + uint64_t dist_value_size = value_size / FLAGS_value_interval; + auto found_stats = ta_[type].stats.find(cf_id); + if (found_stats == ta_[type].stats.end()) { + ta_[type].stats[cf_id].cf_id = cf_id; + ta_[type].stats[cf_id].cf_name = std::to_string(cf_id); + ta_[type].stats[cf_id].a_count = 1; + ta_[type].stats[cf_id].a_key_id = 0; + ta_[type].stats[cf_id].a_key_size_sqsum = MultiplyCheckOverflow( + static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size())); + ta_[type].stats[cf_id].a_key_size_sum = key.size(); + ta_[type].stats[cf_id].a_value_size_sqsum = MultiplyCheckOverflow( + static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size)); + ta_[type].stats[cf_id].a_value_size_sum = value_size; + s = OpenStatsOutputFiles(ta_[type].type_name, ta_[type].stats[cf_id]); + if (!FLAGS_print_correlation.empty()) { + s = StatsUnitCorrelationUpdate(unit, type, ts, key); + } + ta_[type].stats[cf_id].a_key_stats[key] = unit; + ta_[type].stats[cf_id].a_value_size_stats[dist_value_size] = 1; + ta_[type].stats[cf_id].a_qps_stats[time_in_sec] = 1; + ta_[type].stats[cf_id].correlation_output.resize( + analyzer_opts_.correlation_list.size()); + if (FLAGS_output_prefix_cut > 0) { + std::map<std::string, uint32_t> tmp_qps_map; + tmp_qps_map[prefix] = 1; + ta_[type].stats[cf_id].a_qps_prefix_stats[time_in_sec] = tmp_qps_map; + } + if (time_in_sec != cur_time_sec_) { + ta_[type].stats[cf_id].uni_key_num[cur_time_sec_] = + static_cast<uint64_t>(ta_[type].stats[cf_id].a_key_stats.size()); + cur_time_sec_ = time_in_sec; + } + } else { + found_stats->second.a_count++; + found_stats->second.a_key_size_sqsum += MultiplyCheckOverflow( + static_cast<uint64_t>(key.size()), static_cast<uint64_t>(key.size())); + found_stats->second.a_key_size_sum += key.size(); + found_stats->second.a_value_size_sqsum += MultiplyCheckOverflow( + static_cast<uint64_t>(value_size), static_cast<uint64_t>(value_size)); + found_stats->second.a_value_size_sum += value_size; + auto found_key = found_stats->second.a_key_stats.find(key); + if (found_key == found_stats->second.a_key_stats.end()) { + found_stats->second.a_key_stats[key] = unit; + } else { + found_key->second.access_count++; + if (type != TraceOperationType::kGet || value_size > 0) { + found_key->second.succ_count++; + } + if (!FLAGS_print_correlation.empty()) { + s = StatsUnitCorrelationUpdate(found_key->second, type, ts, key); + } + } + if (time_in_sec != cur_time_sec_) { + found_stats->second.uni_key_num[cur_time_sec_] = + static_cast<uint64_t>(found_stats->second.a_key_stats.size()); + cur_time_sec_ = time_in_sec; + } + + auto found_value = + found_stats->second.a_value_size_stats.find(dist_value_size); + if (found_value == found_stats->second.a_value_size_stats.end()) { + found_stats->second.a_value_size_stats[dist_value_size] = 1; + } else { + found_value->second++; + } + + auto found_qps = found_stats->second.a_qps_stats.find(time_in_sec); + if (found_qps == found_stats->second.a_qps_stats.end()) { + found_stats->second.a_qps_stats[time_in_sec] = 1; + } else { + found_qps->second++; + } + + if (FLAGS_output_prefix_cut > 0) { + auto found_qps_prefix = + found_stats->second.a_qps_prefix_stats.find(time_in_sec); + if (found_qps_prefix == found_stats->second.a_qps_prefix_stats.end()) { + std::map<std::string, uint32_t> tmp_qps_map; + found_stats->second.a_qps_prefix_stats[time_in_sec] = tmp_qps_map; + } + if (found_stats->second.a_qps_prefix_stats[time_in_sec].find(prefix) == + found_stats->second.a_qps_prefix_stats[time_in_sec].end()) { + found_stats->second.a_qps_prefix_stats[time_in_sec][prefix] = 1; + } else { + found_stats->second.a_qps_prefix_stats[time_in_sec][prefix]++; + } + } + } + + if (cfs_.find(cf_id) == cfs_.end()) { + CfUnit cf_unit; + cf_unit.cf_id = cf_id; + cf_unit.w_count = 0; + cf_unit.a_count = 0; + cfs_[cf_id] = cf_unit; + } + + if (FLAGS_output_qps_stats) { + cfs_[cf_id].cf_qps[time_in_sec]++; + } + + if (FLAGS_output_time_series) { + TraceUnit trace_u; + trace_u.type = type; + trace_u.key = key; + trace_u.value_size = value_size; + trace_u.ts = (ts - time_series_start_) / 1000000; + trace_u.cf_id = cf_id; + ta_[type].stats[cf_id].time_series.push_back(trace_u); + } + + return Status::OK(); +} + +// Update the correlation unit of each key if enabled +Status TraceAnalyzer::StatsUnitCorrelationUpdate(StatsUnit& unit, + const uint32_t& type_second, + const uint64_t& ts, + const std::string& key) { + if (type_second >= kTaTypeNum) { + fprintf(stderr, "Unknown Type Id: %u\n", type_second); + return Status::NotFound(); + } + + for (int type_first = 0; type_first < kTaTypeNum; type_first++) { + if (type_first >= static_cast<int>(ta_.size()) || + type_first >= static_cast<int>(analyzer_opts_.correlation_map.size())) { + break; + } + if (analyzer_opts_.correlation_map[type_first][type_second] < 0 || + ta_[type_first].stats.find(unit.cf_id) == ta_[type_first].stats.end() || + ta_[type_first].stats[unit.cf_id].a_key_stats.find(key) == + ta_[type_first].stats[unit.cf_id].a_key_stats.end() || + ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts == ts) { + continue; + } + + int correlation_id = + analyzer_opts_.correlation_map[type_first][type_second]; + + // after get the x-y operation time or x, update; + if (correlation_id < 0 || + correlation_id >= static_cast<int>(unit.v_correlation.size())) { + continue; + } + unit.v_correlation[correlation_id].count++; + unit.v_correlation[correlation_id].total_ts += + (ts - ta_[type_first].stats[unit.cf_id].a_key_stats[key].latest_ts); + } + + unit.latest_ts = ts; + return Status::OK(); +} + +// when a new trace statistic is created, the file handler +// pointers should be initiated if needed according to +// the trace analyzer options +Status TraceAnalyzer::OpenStatsOutputFiles(const std::string& type, + TraceStats& new_stats) { + Status s; + if (FLAGS_output_key_stats) { + s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_stats.txt", + &new_stats.a_key_f); + s = CreateOutputFile(type, new_stats.cf_name, + "accessed_unique_key_num_change.txt", + &new_stats.a_key_num_f); + if (!FLAGS_key_space_dir.empty()) { + s = CreateOutputFile(type, new_stats.cf_name, "whole_key_stats.txt", + &new_stats.w_key_f); + } + } + + if (FLAGS_output_access_count_stats) { + s = CreateOutputFile(type, new_stats.cf_name, + "accessed_key_count_distribution.txt", + &new_stats.a_count_dist_f); + } + + if (FLAGS_output_prefix_cut > 0) { + s = CreateOutputFile(type, new_stats.cf_name, "accessed_key_prefix_cut.txt", + &new_stats.a_prefix_cut_f); + if (!FLAGS_key_space_dir.empty()) { + s = CreateOutputFile(type, new_stats.cf_name, "whole_key_prefix_cut.txt", + &new_stats.w_prefix_cut_f); + } + + if (FLAGS_output_qps_stats) { + s = CreateOutputFile(type, new_stats.cf_name, + "accessed_top_k_qps_prefix_cut.txt", + &new_stats.a_top_qps_prefix_f); + } + } + + if (FLAGS_output_time_series) { + s = CreateOutputFile(type, new_stats.cf_name, "time_series.txt", + &new_stats.time_series_f); + } + + if (FLAGS_output_value_distribution) { + s = CreateOutputFile(type, new_stats.cf_name, + "accessed_value_size_distribution.txt", + &new_stats.a_value_size_f); + } + + if (FLAGS_output_key_distribution) { + s = CreateOutputFile(type, new_stats.cf_name, + "accessed_key_size_distribution.txt", + &new_stats.a_key_size_f); + } + + if (FLAGS_output_qps_stats) { + s = CreateOutputFile(type, new_stats.cf_name, "qps_stats.txt", + &new_stats.a_qps_f); + } + + return Status::OK(); +} + +// create the output path of the files to be opened +Status TraceAnalyzer::CreateOutputFile( + const std::string& type, const std::string& cf_name, + const std::string& ending, std::unique_ptr<rocksdb::WritableFile>* f_ptr) { + std::string path; + path = output_path_ + "/" + FLAGS_output_prefix + "-" + type + "-" + cf_name + + "-" + ending; + Status s; + s = env_->NewWritableFile(path, f_ptr, env_options_); + if (!s.ok()) { + fprintf(stderr, "Cannot open file: %s\n", path.c_str()); + exit(1); + } + return Status::OK(); +} + +// Close the output files in the TraceStats if they are opened +void TraceAnalyzer::CloseOutputFiles() { + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled) { + continue; + } + for (auto& stat : ta_[type].stats) { + if (stat.second.time_series_f) { + stat.second.time_series_f->Close(); + } + + if (stat.second.a_key_f) { + stat.second.a_key_f->Close(); + } + + if (stat.second.a_key_num_f) { + stat.second.a_key_num_f->Close(); + } + + if (stat.second.a_count_dist_f) { + stat.second.a_count_dist_f->Close(); + } + + if (stat.second.a_prefix_cut_f) { + stat.second.a_prefix_cut_f->Close(); + } + + if (stat.second.a_value_size_f) { + stat.second.a_value_size_f->Close(); + } + + if (stat.second.a_key_size_f) { + stat.second.a_key_size_f->Close(); + } + + if (stat.second.a_qps_f) { + stat.second.a_qps_f->Close(); + } + + if (stat.second.a_top_qps_prefix_f) { + stat.second.a_top_qps_prefix_f->Close(); + } + + if (stat.second.w_key_f) { + stat.second.w_key_f->Close(); + } + if (stat.second.w_prefix_cut_f) { + stat.second.w_prefix_cut_f->Close(); + } + } + } + return; +} + +// Handle the Get request in the trace +Status TraceAnalyzer::HandleGet(uint32_t column_family_id, + const std::string& key, const uint64_t& ts, + const uint32_t& get_ret) { + Status s; + size_t value_size = 0; + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key, + value_size, ts); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[TraceOperationType::kGet].sample_count >= sample_max_) { + ta_[TraceOperationType::kGet].sample_count = 0; + } + if (ta_[TraceOperationType::kGet].sample_count > 0) { + ta_[TraceOperationType::kGet].sample_count++; + return Status::OK(); + } + ta_[TraceOperationType::kGet].sample_count++; + + if (!ta_[TraceOperationType::kGet].enabled) { + return Status::OK(); + } + if (get_ret == 1) { + value_size = 10; + } + s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key, + value_size, ts); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +// Handle the Put request in the write batch of the trace +Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key, + const Slice& value) { + Status s; + size_t value_size = value.ToString().size(); + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(TraceOperationType::kPut, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[TraceOperationType::kPut].sample_count >= sample_max_) { + ta_[TraceOperationType::kPut].sample_count = 0; + } + if (ta_[TraceOperationType::kPut].sample_count > 0) { + ta_[TraceOperationType::kPut].sample_count++; + return Status::OK(); + } + ta_[TraceOperationType::kPut].sample_count++; + + if (!ta_[TraceOperationType::kPut].enabled) { + return Status::OK(); + } + s = KeyStatsInsertion(TraceOperationType::kPut, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +// Handle the Delete request in the write batch of the trace +Status TraceAnalyzer::HandleDelete(uint32_t column_family_id, + const Slice& key) { + Status s; + size_t value_size = 0; + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(TraceOperationType::kDelete, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[TraceOperationType::kDelete].sample_count >= sample_max_) { + ta_[TraceOperationType::kDelete].sample_count = 0; + } + if (ta_[TraceOperationType::kDelete].sample_count > 0) { + ta_[TraceOperationType::kDelete].sample_count++; + return Status::OK(); + } + ta_[TraceOperationType::kDelete].sample_count++; + + if (!ta_[TraceOperationType::kDelete].enabled) { + return Status::OK(); + } + s = KeyStatsInsertion(TraceOperationType::kDelete, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +// Handle the SingleDelete request in the write batch of the trace +Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id, + const Slice& key) { + Status s; + size_t value_size = 0; + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(TraceOperationType::kSingleDelete, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[TraceOperationType::kSingleDelete].sample_count >= sample_max_) { + ta_[TraceOperationType::kSingleDelete].sample_count = 0; + } + if (ta_[TraceOperationType::kSingleDelete].sample_count > 0) { + ta_[TraceOperationType::kSingleDelete].sample_count++; + return Status::OK(); + } + ta_[TraceOperationType::kSingleDelete].sample_count++; + + if (!ta_[TraceOperationType::kSingleDelete].enabled) { + return Status::OK(); + } + s = KeyStatsInsertion(TraceOperationType::kSingleDelete, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +// Handle the DeleteRange request in the write batch of the trace +Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id, + const Slice& begin_key, + const Slice& end_key) { + Status s; + size_t value_size = 0; + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(TraceOperationType::kRangeDelete, column_family_id, + begin_key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[TraceOperationType::kRangeDelete].sample_count >= sample_max_) { + ta_[TraceOperationType::kRangeDelete].sample_count = 0; + } + if (ta_[TraceOperationType::kRangeDelete].sample_count > 0) { + ta_[TraceOperationType::kRangeDelete].sample_count++; + return Status::OK(); + } + ta_[TraceOperationType::kRangeDelete].sample_count++; + + if (!ta_[TraceOperationType::kRangeDelete].enabled) { + return Status::OK(); + } + s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id, + begin_key.ToString(), value_size, c_time_); + s = KeyStatsInsertion(TraceOperationType::kRangeDelete, column_family_id, + end_key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +// Handle the Merge request in the write batch of the trace +Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key, + const Slice& value) { + Status s; + size_t value_size = value.ToString().size(); + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(TraceOperationType::kMerge, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[TraceOperationType::kMerge].sample_count >= sample_max_) { + ta_[TraceOperationType::kMerge].sample_count = 0; + } + if (ta_[TraceOperationType::kMerge].sample_count > 0) { + ta_[TraceOperationType::kMerge].sample_count++; + return Status::OK(); + } + ta_[TraceOperationType::kMerge].sample_count++; + + if (!ta_[TraceOperationType::kMerge].enabled) { + return Status::OK(); + } + s = KeyStatsInsertion(TraceOperationType::kMerge, column_family_id, + key.ToString(), value_size, c_time_); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +// Handle the Iterator request in the trace +Status TraceAnalyzer::HandleIter(uint32_t column_family_id, + const std::string& key, const uint64_t& ts, + TraceType& trace_type) { + Status s; + size_t value_size = 0; + int type = -1; + if (trace_type == kTraceIteratorSeek) { + type = TraceOperationType::kIteratorSeek; + } else if (trace_type == kTraceIteratorSeekForPrev) { + type = TraceOperationType::kIteratorSeekForPrev; + } else { + return s; + } + if (type == -1) { + return s; + } + + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(type, column_family_id, key, value_size, ts); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[type].sample_count >= sample_max_) { + ta_[type].sample_count = 0; + } + if (ta_[type].sample_count > 0) { + ta_[type].sample_count++; + return Status::OK(); + } + ta_[type].sample_count++; + + if (!ta_[type].enabled) { + return Status::OK(); + } + s = KeyStatsInsertion(type, column_family_id, key, value_size, ts); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +// Before the analyzer is closed, the requested general statistic results are +// printed out here. In current stage, these information are not output to +// the files. +// -----type +// |__cf_id +// |_statistics +void TraceAnalyzer::PrintStatistics() { + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled) { + continue; + } + ta_[type].total_keys = 0; + ta_[type].total_access = 0; + ta_[type].total_succ_access = 0; + printf("\n################# Operation Type: %s #####################\n", + ta_[type].type_name.c_str()); + if (qps_ave_.size() == kTaTypeNum + 1) { + printf("Peak QPS is: %u Average QPS is: %f\n", qps_peak_[type], + qps_ave_[type]); + } + for (auto& stat_it : ta_[type].stats) { + if (stat_it.second.a_count == 0) { + continue; + } + TraceStats& stat = stat_it.second; + uint64_t total_a_keys = static_cast<uint64_t>(stat.a_key_stats.size()); + double key_size_ave = 0.0; + double value_size_ave = 0.0; + double key_size_vari = 0.0; + double value_size_vari = 0.0; + if (stat.a_count > 0) { + key_size_ave = + (static_cast<double>(stat.a_key_size_sum)) / stat.a_count; + value_size_ave = + (static_cast<double>(stat.a_value_size_sum)) / stat.a_count; + key_size_vari = std::sqrt((static_cast<double>(stat.a_key_size_sqsum)) / + stat.a_count - + key_size_ave * key_size_ave); + value_size_vari = std::sqrt( + (static_cast<double>(stat.a_value_size_sqsum)) / stat.a_count - + value_size_ave * value_size_ave); + } + if (value_size_ave == 0.0) { + stat.a_value_mid = 0; + } + cfs_[stat.cf_id].a_count += total_a_keys; + ta_[type].total_keys += total_a_keys; + ta_[type].total_access += stat.a_count; + ta_[type].total_succ_access += stat.a_succ_count; + printf("*********************************************************\n"); + printf("colume family id: %u\n", stat.cf_id); + printf("Total number of queries to this cf by %s: %" PRIu64 "\n", + ta_[type].type_name.c_str(), stat.a_count); + printf("Total unique keys in this cf: %" PRIu64 "\n", total_a_keys); + printf("Average key size: %f key size medium: %" PRIu64 + " Key size Variation: %f\n", + key_size_ave, stat.a_key_mid, key_size_vari); + if (type == kPut || type == kMerge) { + printf("Average value size: %f Value size medium: %" PRIu64 + " Value size variation: %f\n", + value_size_ave, stat.a_value_mid, value_size_vari); + } + printf("Peak QPS is: %u Average QPS is: %f\n", stat.a_peak_qps, + stat.a_ave_qps); + + // print the top k accessed key and its access count + if (FLAGS_print_top_k_access > 0) { + printf("The Top %d keys that are accessed:\n", + FLAGS_print_top_k_access); + while (!stat.top_k_queue.empty()) { + std::string hex_key = + rocksdb::LDBCommand::StringToHex(stat.top_k_queue.top().second); + printf("Access_count: %" PRIu64 " %s\n", stat.top_k_queue.top().first, + hex_key.c_str()); + stat.top_k_queue.pop(); + } + } + + // print the top k access prefix range and + // top k prefix range with highest average access per key + if (FLAGS_output_prefix_cut > 0) { + printf("The Top %d accessed prefix range:\n", FLAGS_print_top_k_access); + while (!stat.top_k_prefix_access.empty()) { + printf("Prefix: %s Access count: %" PRIu64 "\n", + stat.top_k_prefix_access.top().second.c_str(), + stat.top_k_prefix_access.top().first); + stat.top_k_prefix_access.pop(); + } + + printf("The Top %d prefix with highest access per key:\n", + FLAGS_print_top_k_access); + while (!stat.top_k_prefix_ave.empty()) { + printf("Prefix: %s access per key: %f\n", + stat.top_k_prefix_ave.top().second.c_str(), + stat.top_k_prefix_ave.top().first); + stat.top_k_prefix_ave.pop(); + } + } + + // print the operation correlations + if (!FLAGS_print_correlation.empty()) { + for (int correlation = 0; + correlation < + static_cast<int>(analyzer_opts_.correlation_list.size()); + correlation++) { + printf( + "The correlation statistics of '%s' after '%s' is:", + taIndexToOpt[analyzer_opts_.correlation_list[correlation].second] + .c_str(), + taIndexToOpt[analyzer_opts_.correlation_list[correlation].first] + .c_str()); + double correlation_ave = 0.0; + if (stat.correlation_output[correlation].first > 0) { + correlation_ave = + (static_cast<double>( + stat.correlation_output[correlation].second)) / + (stat.correlation_output[correlation].first * 1000); + } + printf(" total numbers: %" PRIu64 " average time: %f(ms)\n", + stat.correlation_output[correlation].first, correlation_ave); + } + } + } + printf("*********************************************************\n"); + printf("Total keys of '%s' is: %" PRIu64 "\n", ta_[type].type_name.c_str(), + ta_[type].total_keys); + printf("Total access is: %" PRIu64 "\n", ta_[type].total_access); + total_access_keys_ += ta_[type].total_keys; + } + + // Print the overall statistic information of the trace + printf("\n*********************************************************\n"); + printf("*********************************************************\n"); + printf("The column family based statistics\n"); + for (auto& cf : cfs_) { + printf("The column family id: %u\n", cf.first); + printf("The whole key space key numbers: %" PRIu64 "\n", cf.second.w_count); + printf("The accessed key space key numbers: %" PRIu64 "\n", + cf.second.a_count); + } + + if (FLAGS_print_overall_stats) { + printf("\n*********************************************************\n"); + printf("*********************************************************\n"); + if (qps_peak_.size() == kTaTypeNum + 1) { + printf("Average QPS per second: %f Peak QPS: %u\n", qps_ave_[kTaTypeNum], + qps_peak_[kTaTypeNum]); + } + printf("The statistics related to query number need to times: %u\n", + sample_max_); + printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64 + " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n", + total_requests_, total_access_keys_, total_gets_, total_writes_); + for (int type = 0; type < kTaTypeNum; type++) { + if (!ta_[type].enabled) { + continue; + } + printf("Operation: '%s' has: %" PRIu64 "\n", ta_[type].type_name.c_str(), + ta_[type].total_access); + } + } +} + +// Write the trace sequence to file +Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type, + const uint32_t& cf_id, + const std::string& key, + const size_t value_size, + const uint64_t ts) { + std::string hex_key = rocksdb::LDBCommand::StringToHex(key); + int ret; + ret = + sprintf(buffer_, "%u %u %zu %" PRIu64 "\n", type, cf_id, value_size, ts); + if (ret < 0) { + return Status::IOError("failed to format the output"); + } + std::string printout(buffer_); + if (!FLAGS_no_key) { + printout = hex_key + " " + printout; + } + return trace_sequence_f_->Append(printout); +} + +// The entrance function of Trace_Analyzer +int trace_analyzer_tool(int argc, char** argv) { + std::string trace_path; + std::string output_path; + + AnalyzerOptions analyzer_opts; + + ParseCommandLineFlags(&argc, &argv, true); + + if (!FLAGS_print_correlation.empty()) { + analyzer_opts.SparseCorrelationInput(FLAGS_print_correlation); + } + + std::unique_ptr<TraceAnalyzer> analyzer( + new TraceAnalyzer(FLAGS_trace_path, FLAGS_output_dir, analyzer_opts)); + + if (!analyzer) { + fprintf(stderr, "Cannot initiate the trace analyzer\n"); + exit(1); + } + + rocksdb::Status s = analyzer->PrepareProcessing(); + if (!s.ok()) { + fprintf(stderr, "%s\n", s.getState()); + fprintf(stderr, "Cannot initiate the trace reader\n"); + exit(1); + } + + s = analyzer->StartProcessing(); + if (!s.ok() && !FLAGS_try_process_corrupted_trace) { + fprintf(stderr, "%s\n", s.getState()); + fprintf(stderr, "Cannot processing the trace\n"); + exit(1); + } + + s = analyzer->MakeStatistics(); + if (!s.ok()) { + fprintf(stderr, "%s\n", s.getState()); + analyzer->EndProcessing(); + fprintf(stderr, "Cannot make the statistics\n"); + exit(1); + } + + s = analyzer->ReProcessing(); + if (!s.ok()) { + fprintf(stderr, "%s\n", s.getState()); + fprintf(stderr, "Cannot re-process the trace for more statistics\n"); + analyzer->EndProcessing(); + exit(1); + } + + s = analyzer->EndProcessing(); + if (!s.ok()) { + fprintf(stderr, "%s\n", s.getState()); + fprintf(stderr, "Cannot close the trace analyzer\n"); + exit(1); + } + + return 0; +} +} // namespace rocksdb + +#endif // Endif of Gflag +#endif // RocksDB LITE |