summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/monitoring
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/monitoring/file_read_sample.h23
-rw-r--r--src/rocksdb/monitoring/histogram.cc288
-rw-r--r--src/rocksdb/monitoring/histogram.h149
-rw-r--r--src/rocksdb/monitoring/histogram_test.cc221
-rw-r--r--src/rocksdb/monitoring/histogram_windowing.cc202
-rw-r--r--src/rocksdb/monitoring/histogram_windowing.h80
-rw-r--r--src/rocksdb/monitoring/in_memory_stats_history.cc49
-rw-r--r--src/rocksdb/monitoring/in_memory_stats_history.h74
-rw-r--r--src/rocksdb/monitoring/instrumented_mutex.cc69
-rw-r--r--src/rocksdb/monitoring/instrumented_mutex.h98
-rw-r--r--src/rocksdb/monitoring/iostats_context.cc62
-rw-r--r--src/rocksdb/monitoring/iostats_context_imp.h60
-rw-r--r--src/rocksdb/monitoring/iostats_context_test.cc29
-rw-r--r--src/rocksdb/monitoring/perf_context.cc559
-rw-r--r--src/rocksdb/monitoring/perf_context_imp.h97
-rw-r--r--src/rocksdb/monitoring/perf_level.cc28
-rw-r--r--src/rocksdb/monitoring/perf_level_imp.h18
-rw-r--r--src/rocksdb/monitoring/perf_step_timer.h79
-rw-r--r--src/rocksdb/monitoring/persistent_stats_history.cc170
-rw-r--r--src/rocksdb/monitoring/persistent_stats_history.h83
-rw-r--r--src/rocksdb/monitoring/statistics.cc406
-rw-r--r--src/rocksdb/monitoring/statistics.h138
-rw-r--r--src/rocksdb/monitoring/statistics_test.cc47
-rw-r--r--src/rocksdb/monitoring/stats_history_test.cc653
-rw-r--r--src/rocksdb/monitoring/thread_status_impl.cc163
-rw-r--r--src/rocksdb/monitoring/thread_status_updater.cc314
-rw-r--r--src/rocksdb/monitoring/thread_status_updater.h233
-rw-r--r--src/rocksdb/monitoring/thread_status_updater_debug.cc42
-rw-r--r--src/rocksdb/monitoring/thread_status_util.cc206
-rw-r--r--src/rocksdb/monitoring/thread_status_util.h134
-rw-r--r--src/rocksdb/monitoring/thread_status_util_debug.cc32
31 files changed, 4806 insertions, 0 deletions
diff --git a/src/rocksdb/monitoring/file_read_sample.h b/src/rocksdb/monitoring/file_read_sample.h
new file mode 100644
index 000000000..82a933e0a
--- /dev/null
+++ b/src/rocksdb/monitoring/file_read_sample.h
@@ -0,0 +1,23 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include "db/version_edit.h"
+#include "util/random.h"
+
+namespace ROCKSDB_NAMESPACE {
+static const uint32_t kFileReadSampleRate = 1024;
+extern bool should_sample_file_read();
+extern void sample_file_read_inc(FileMetaData*);
+
+inline bool should_sample_file_read() {
+ return (Random::GetTLSInstance()->Next() % kFileReadSampleRate == 307);
+}
+
+inline void sample_file_read_inc(FileMetaData* meta) {
+ meta->stats.num_reads_sampled.fetch_add(kFileReadSampleRate,
+ std::memory_order_relaxed);
+}
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/histogram.cc b/src/rocksdb/monitoring/histogram.cc
new file mode 100644
index 000000000..55339f888
--- /dev/null
+++ b/src/rocksdb/monitoring/histogram.cc
@@ -0,0 +1,288 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "monitoring/histogram.h"
+
+#include <stdio.h>
+#include <cassert>
+#include <cinttypes>
+#include <cmath>
+
+#include "port/port.h"
+#include "util/cast_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+HistogramBucketMapper::HistogramBucketMapper() {
+ // If you change this, you also need to change
+ // size of array buckets_ in HistogramImpl
+ bucketValues_ = {1, 2};
+ valueIndexMap_ = {{1, 0}, {2, 1}};
+ double bucket_val = static_cast<double>(bucketValues_.back());
+ while ((bucket_val = 1.5 * bucket_val) <= static_cast<double>(port::kMaxUint64)) {
+ bucketValues_.push_back(static_cast<uint64_t>(bucket_val));
+ // Extracts two most significant digits to make histogram buckets more
+ // human-readable. E.g., 172 becomes 170.
+ uint64_t pow_of_ten = 1;
+ while (bucketValues_.back() / 10 > 10) {
+ bucketValues_.back() /= 10;
+ pow_of_ten *= 10;
+ }
+ bucketValues_.back() *= pow_of_ten;
+ valueIndexMap_[bucketValues_.back()] = bucketValues_.size() - 1;
+ }
+ maxBucketValue_ = bucketValues_.back();
+ minBucketValue_ = bucketValues_.front();
+}
+
+size_t HistogramBucketMapper::IndexForValue(const uint64_t value) const {
+ if (value >= maxBucketValue_) {
+ return bucketValues_.size() - 1;
+ } else if ( value >= minBucketValue_ ) {
+ std::map<uint64_t, uint64_t>::const_iterator lowerBound =
+ valueIndexMap_.lower_bound(value);
+ if (lowerBound != valueIndexMap_.end()) {
+ return static_cast<size_t>(lowerBound->second);
+ } else {
+ return 0;
+ }
+ } else {
+ return 0;
+ }
+}
+
+namespace {
+ const HistogramBucketMapper bucketMapper;
+}
+
+HistogramStat::HistogramStat()
+ : num_buckets_(bucketMapper.BucketCount()) {
+ assert(num_buckets_ == sizeof(buckets_) / sizeof(*buckets_));
+ Clear();
+}
+
+void HistogramStat::Clear() {
+ min_.store(bucketMapper.LastValue(), std::memory_order_relaxed);
+ max_.store(0, std::memory_order_relaxed);
+ num_.store(0, std::memory_order_relaxed);
+ sum_.store(0, std::memory_order_relaxed);
+ sum_squares_.store(0, std::memory_order_relaxed);
+ for (unsigned int b = 0; b < num_buckets_; b++) {
+ buckets_[b].store(0, std::memory_order_relaxed);
+ }
+};
+
+bool HistogramStat::Empty() const { return num() == 0; }
+
+void HistogramStat::Add(uint64_t value) {
+ // This function is designed to be lock free, as it's in the critical path
+ // of any operation. Each individual value is atomic and the order of updates
+ // by concurrent threads is tolerable.
+ const size_t index = bucketMapper.IndexForValue(value);
+ assert(index < num_buckets_);
+ buckets_[index].store(buckets_[index].load(std::memory_order_relaxed) + 1,
+ std::memory_order_relaxed);
+
+ uint64_t old_min = min();
+ if (value < old_min) {
+ min_.store(value, std::memory_order_relaxed);
+ }
+
+ uint64_t old_max = max();
+ if (value > old_max) {
+ max_.store(value, std::memory_order_relaxed);
+ }
+
+ num_.store(num_.load(std::memory_order_relaxed) + 1,
+ std::memory_order_relaxed);
+ sum_.store(sum_.load(std::memory_order_relaxed) + value,
+ std::memory_order_relaxed);
+ sum_squares_.store(
+ sum_squares_.load(std::memory_order_relaxed) + value * value,
+ std::memory_order_relaxed);
+}
+
+void HistogramStat::Merge(const HistogramStat& other) {
+ // This function needs to be performned with the outer lock acquired
+ // However, atomic operation on every member is still need, since Add()
+ // requires no lock and value update can still happen concurrently
+ uint64_t old_min = min();
+ uint64_t other_min = other.min();
+ while (other_min < old_min &&
+ !min_.compare_exchange_weak(old_min, other_min)) {}
+
+ uint64_t old_max = max();
+ uint64_t other_max = other.max();
+ while (other_max > old_max &&
+ !max_.compare_exchange_weak(old_max, other_max)) {}
+
+ num_.fetch_add(other.num(), std::memory_order_relaxed);
+ sum_.fetch_add(other.sum(), std::memory_order_relaxed);
+ sum_squares_.fetch_add(other.sum_squares(), std::memory_order_relaxed);
+ for (unsigned int b = 0; b < num_buckets_; b++) {
+ buckets_[b].fetch_add(other.bucket_at(b), std::memory_order_relaxed);
+ }
+}
+
+double HistogramStat::Median() const {
+ return Percentile(50.0);
+}
+
+double HistogramStat::Percentile(double p) const {
+ double threshold = num() * (p / 100.0);
+ uint64_t cumulative_sum = 0;
+ for (unsigned int b = 0; b < num_buckets_; b++) {
+ uint64_t bucket_value = bucket_at(b);
+ cumulative_sum += bucket_value;
+ if (cumulative_sum >= threshold) {
+ // Scale linearly within this bucket
+ uint64_t left_point = (b == 0) ? 0 : bucketMapper.BucketLimit(b-1);
+ uint64_t right_point = bucketMapper.BucketLimit(b);
+ uint64_t left_sum = cumulative_sum - bucket_value;
+ uint64_t right_sum = cumulative_sum;
+ double pos = 0;
+ uint64_t right_left_diff = right_sum - left_sum;
+ if (right_left_diff != 0) {
+ pos = (threshold - left_sum) / right_left_diff;
+ }
+ double r = left_point + (right_point - left_point) * pos;
+ uint64_t cur_min = min();
+ uint64_t cur_max = max();
+ if (r < cur_min) r = static_cast<double>(cur_min);
+ if (r > cur_max) r = static_cast<double>(cur_max);
+ return r;
+ }
+ }
+ return static_cast<double>(max());
+}
+
+double HistogramStat::Average() const {
+ uint64_t cur_num = num();
+ uint64_t cur_sum = sum();
+ if (cur_num == 0) return 0;
+ return static_cast<double>(cur_sum) / static_cast<double>(cur_num);
+}
+
+double HistogramStat::StandardDeviation() const {
+ uint64_t cur_num = num();
+ uint64_t cur_sum = sum();
+ uint64_t cur_sum_squares = sum_squares();
+ if (cur_num == 0) return 0;
+ double variance =
+ static_cast<double>(cur_sum_squares * cur_num - cur_sum * cur_sum) /
+ static_cast<double>(cur_num * cur_num);
+ return std::sqrt(variance);
+}
+std::string HistogramStat::ToString() const {
+ uint64_t cur_num = num();
+ std::string r;
+ char buf[1650];
+ snprintf(buf, sizeof(buf),
+ "Count: %" PRIu64 " Average: %.4f StdDev: %.2f\n",
+ cur_num, Average(), StandardDeviation());
+ r.append(buf);
+ snprintf(buf, sizeof(buf),
+ "Min: %" PRIu64 " Median: %.4f Max: %" PRIu64 "\n",
+ (cur_num == 0 ? 0 : min()), Median(), (cur_num == 0 ? 0 : max()));
+ r.append(buf);
+ snprintf(buf, sizeof(buf),
+ "Percentiles: "
+ "P50: %.2f P75: %.2f P99: %.2f P99.9: %.2f P99.99: %.2f\n",
+ Percentile(50), Percentile(75), Percentile(99), Percentile(99.9),
+ Percentile(99.99));
+ r.append(buf);
+ r.append("------------------------------------------------------\n");
+ if (cur_num == 0) return r; // all buckets are empty
+ const double mult = 100.0 / cur_num;
+ uint64_t cumulative_sum = 0;
+ for (unsigned int b = 0; b < num_buckets_; b++) {
+ uint64_t bucket_value = bucket_at(b);
+ if (bucket_value <= 0.0) continue;
+ cumulative_sum += bucket_value;
+ snprintf(buf, sizeof(buf),
+ "%c %7" PRIu64 ", %7" PRIu64 " ] %8" PRIu64 " %7.3f%% %7.3f%% ",
+ (b == 0) ? '[' : '(',
+ (b == 0) ? 0 : bucketMapper.BucketLimit(b-1), // left
+ bucketMapper.BucketLimit(b), // right
+ bucket_value, // count
+ (mult * bucket_value), // percentage
+ (mult * cumulative_sum)); // cumulative percentage
+ r.append(buf);
+
+ // Add hash marks based on percentage; 20 marks for 100%.
+ size_t marks = static_cast<size_t>(mult * bucket_value / 5 + 0.5);
+ r.append(marks, '#');
+ r.push_back('\n');
+ }
+ return r;
+}
+
+void HistogramStat::Data(HistogramData * const data) const {
+ assert(data);
+ data->median = Median();
+ data->percentile95 = Percentile(95);
+ data->percentile99 = Percentile(99);
+ data->max = static_cast<double>(max());
+ data->average = Average();
+ data->standard_deviation = StandardDeviation();
+ data->count = num();
+ data->sum = sum();
+ data->min = static_cast<double>(min());
+}
+
+void HistogramImpl::Clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ stats_.Clear();
+}
+
+bool HistogramImpl::Empty() const {
+ return stats_.Empty();
+}
+
+void HistogramImpl::Add(uint64_t value) {
+ stats_.Add(value);
+}
+
+void HistogramImpl::Merge(const Histogram& other) {
+ if (strcmp(Name(), other.Name()) == 0) {
+ Merge(
+ *static_cast_with_check<const HistogramImpl, const Histogram>(&other));
+ }
+}
+
+void HistogramImpl::Merge(const HistogramImpl& other) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ stats_.Merge(other.stats_);
+}
+
+double HistogramImpl::Median() const {
+ return stats_.Median();
+}
+
+double HistogramImpl::Percentile(double p) const {
+ return stats_.Percentile(p);
+}
+
+double HistogramImpl::Average() const {
+ return stats_.Average();
+}
+
+double HistogramImpl::StandardDeviation() const {
+ return stats_.StandardDeviation();
+}
+
+std::string HistogramImpl::ToString() const {
+ return stats_.ToString();
+}
+
+void HistogramImpl::Data(HistogramData * const data) const {
+ stats_.Data(data);
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/histogram.h b/src/rocksdb/monitoring/histogram.h
new file mode 100644
index 000000000..a6b93e8fd
--- /dev/null
+++ b/src/rocksdb/monitoring/histogram.h
@@ -0,0 +1,149 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include "rocksdb/statistics.h"
+
+#include <cassert>
+#include <string>
+#include <vector>
+#include <map>
+#include <mutex>
+
+namespace ROCKSDB_NAMESPACE {
+
+class HistogramBucketMapper {
+ public:
+
+ HistogramBucketMapper();
+
+ // converts a value to the bucket index.
+ size_t IndexForValue(uint64_t value) const;
+ // number of buckets required.
+
+ size_t BucketCount() const {
+ return bucketValues_.size();
+ }
+
+ uint64_t LastValue() const {
+ return maxBucketValue_;
+ }
+
+ uint64_t FirstValue() const {
+ return minBucketValue_;
+ }
+
+ uint64_t BucketLimit(const size_t bucketNumber) const {
+ assert(bucketNumber < BucketCount());
+ return bucketValues_[bucketNumber];
+ }
+
+ private:
+ std::vector<uint64_t> bucketValues_;
+ uint64_t maxBucketValue_;
+ uint64_t minBucketValue_;
+ std::map<uint64_t, uint64_t> valueIndexMap_;
+};
+
+struct HistogramStat {
+ HistogramStat();
+ ~HistogramStat() {}
+
+ HistogramStat(const HistogramStat&) = delete;
+ HistogramStat& operator=(const HistogramStat&) = delete;
+
+ void Clear();
+ bool Empty() const;
+ void Add(uint64_t value);
+ void Merge(const HistogramStat& other);
+
+ inline uint64_t min() const { return min_.load(std::memory_order_relaxed); }
+ inline uint64_t max() const { return max_.load(std::memory_order_relaxed); }
+ inline uint64_t num() const { return num_.load(std::memory_order_relaxed); }
+ inline uint64_t sum() const { return sum_.load(std::memory_order_relaxed); }
+ inline uint64_t sum_squares() const {
+ return sum_squares_.load(std::memory_order_relaxed);
+ }
+ inline uint64_t bucket_at(size_t b) const {
+ return buckets_[b].load(std::memory_order_relaxed);
+ }
+
+ double Median() const;
+ double Percentile(double p) const;
+ double Average() const;
+ double StandardDeviation() const;
+ void Data(HistogramData* const data) const;
+ std::string ToString() const;
+
+ // To be able to use HistogramStat as thread local variable, it
+ // cannot have dynamic allocated member. That's why we're
+ // using manually values from BucketMapper
+ std::atomic_uint_fast64_t min_;
+ std::atomic_uint_fast64_t max_;
+ std::atomic_uint_fast64_t num_;
+ std::atomic_uint_fast64_t sum_;
+ std::atomic_uint_fast64_t sum_squares_;
+ std::atomic_uint_fast64_t buckets_[109]; // 109==BucketMapper::BucketCount()
+ const uint64_t num_buckets_;
+};
+
+class Histogram {
+public:
+ Histogram() {}
+ virtual ~Histogram() {};
+
+ virtual void Clear() = 0;
+ virtual bool Empty() const = 0;
+ virtual void Add(uint64_t value) = 0;
+ virtual void Merge(const Histogram&) = 0;
+
+ virtual std::string ToString() const = 0;
+ virtual const char* Name() const = 0;
+ virtual uint64_t min() const = 0;
+ virtual uint64_t max() const = 0;
+ virtual uint64_t num() const = 0;
+ virtual double Median() const = 0;
+ virtual double Percentile(double p) const = 0;
+ virtual double Average() const = 0;
+ virtual double StandardDeviation() const = 0;
+ virtual void Data(HistogramData* const data) const = 0;
+};
+
+class HistogramImpl : public Histogram {
+ public:
+ HistogramImpl() { Clear(); }
+
+ HistogramImpl(const HistogramImpl&) = delete;
+ HistogramImpl& operator=(const HistogramImpl&) = delete;
+
+ virtual void Clear() override;
+ virtual bool Empty() const override;
+ virtual void Add(uint64_t value) override;
+ virtual void Merge(const Histogram& other) override;
+ void Merge(const HistogramImpl& other);
+
+ virtual std::string ToString() const override;
+ virtual const char* Name() const override { return "HistogramImpl"; }
+ virtual uint64_t min() const override { return stats_.min(); }
+ virtual uint64_t max() const override { return stats_.max(); }
+ virtual uint64_t num() const override { return stats_.num(); }
+ virtual double Median() const override;
+ virtual double Percentile(double p) const override;
+ virtual double Average() const override;
+ virtual double StandardDeviation() const override;
+ virtual void Data(HistogramData* const data) const override;
+
+ virtual ~HistogramImpl() {}
+
+ private:
+ HistogramStat stats_;
+ std::mutex mutex_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/histogram_test.cc b/src/rocksdb/monitoring/histogram_test.cc
new file mode 100644
index 000000000..36a7d7154
--- /dev/null
+++ b/src/rocksdb/monitoring/histogram_test.cc
@@ -0,0 +1,221 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#include <cmath>
+
+#include "monitoring/histogram.h"
+#include "monitoring/histogram_windowing.h"
+#include "test_util/testharness.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class HistogramTest : public testing::Test {};
+
+namespace {
+ const double kIota = 0.1;
+ const HistogramBucketMapper bucketMapper;
+ Env* env = Env::Default();
+}
+
+void PopulateHistogram(Histogram& histogram,
+ uint64_t low, uint64_t high, uint64_t loop = 1) {
+ for (; loop > 0; loop--) {
+ for (uint64_t i = low; i <= high; i++) {
+ histogram.Add(i);
+ }
+ }
+}
+
+void BasicOperation(Histogram& histogram) {
+ PopulateHistogram(histogram, 1, 110, 10); // fill up to bucket [70, 110)
+
+ HistogramData data;
+ histogram.Data(&data);
+
+ ASSERT_LE(fabs(histogram.Percentile(100.0) - 110.0), kIota);
+ ASSERT_LE(fabs(data.percentile99 - 108.9), kIota); // 99 * 110 / 100
+ ASSERT_LE(fabs(data.percentile95 - 104.5), kIota); // 95 * 110 / 100
+ ASSERT_LE(fabs(data.median - 55.0), kIota); // 50 * 110 / 100
+ ASSERT_EQ(data.average, 55.5); // (1 + 110) / 2
+}
+
+void MergeHistogram(Histogram& histogram, Histogram& other) {
+ PopulateHistogram(histogram, 1, 100);
+ PopulateHistogram(other, 101, 250);
+ histogram.Merge(other);
+
+ HistogramData data;
+ histogram.Data(&data);
+
+ ASSERT_LE(fabs(histogram.Percentile(100.0) - 250.0), kIota);
+ ASSERT_LE(fabs(data.percentile99 - 247.5), kIota); // 99 * 250 / 100
+ ASSERT_LE(fabs(data.percentile95 - 237.5), kIota); // 95 * 250 / 100
+ ASSERT_LE(fabs(data.median - 125.0), kIota); // 50 * 250 / 100
+ ASSERT_EQ(data.average, 125.5); // (1 + 250) / 2
+}
+
+void EmptyHistogram(Histogram& histogram) {
+ ASSERT_EQ(histogram.min(), bucketMapper.LastValue());
+ ASSERT_EQ(histogram.max(), 0);
+ ASSERT_EQ(histogram.num(), 0);
+ ASSERT_EQ(histogram.Median(), 0.0);
+ ASSERT_EQ(histogram.Percentile(85.0), 0.0);
+ ASSERT_EQ(histogram.Average(), 0.0);
+ ASSERT_EQ(histogram.StandardDeviation(), 0.0);
+}
+
+void ClearHistogram(Histogram& histogram) {
+ for (uint64_t i = 1; i <= 100; i++) {
+ histogram.Add(i);
+ }
+ histogram.Clear();
+ ASSERT_TRUE(histogram.Empty());
+ ASSERT_EQ(histogram.Median(), 0);
+ ASSERT_EQ(histogram.Percentile(85.0), 0);
+ ASSERT_EQ(histogram.Average(), 0);
+}
+
+TEST_F(HistogramTest, BasicOperation) {
+ HistogramImpl histogram;
+ BasicOperation(histogram);
+
+ HistogramWindowingImpl histogramWindowing;
+ BasicOperation(histogramWindowing);
+}
+
+TEST_F(HistogramTest, BoundaryValue) {
+ HistogramImpl histogram;
+ // - both should be in [0, 1] bucket because we place values on bucket
+ // boundaries in the lower bucket.
+ // - all points are in [0, 1] bucket, so p50 will be 0.5
+ // - the test cannot be written with a single point since histogram won't
+ // report percentiles lower than the min or greater than the max.
+ histogram.Add(0);
+ histogram.Add(1);
+
+ ASSERT_LE(fabs(histogram.Percentile(50.0) - 0.5), kIota);
+}
+
+TEST_F(HistogramTest, MergeHistogram) {
+ HistogramImpl histogram;
+ HistogramImpl other;
+ MergeHistogram(histogram, other);
+
+ HistogramWindowingImpl histogramWindowing;
+ HistogramWindowingImpl otherWindowing;
+ MergeHistogram(histogramWindowing, otherWindowing);
+}
+
+TEST_F(HistogramTest, EmptyHistogram) {
+ HistogramImpl histogram;
+ EmptyHistogram(histogram);
+
+ HistogramWindowingImpl histogramWindowing;
+ EmptyHistogram(histogramWindowing);
+}
+
+TEST_F(HistogramTest, ClearHistogram) {
+ HistogramImpl histogram;
+ ClearHistogram(histogram);
+
+ HistogramWindowingImpl histogramWindowing;
+ ClearHistogram(histogramWindowing);
+}
+
+TEST_F(HistogramTest, HistogramWindowingExpire) {
+ uint64_t num_windows = 3;
+ int micros_per_window = 1000000;
+ uint64_t min_num_per_window = 0;
+
+ HistogramWindowingImpl
+ histogramWindowing(num_windows, micros_per_window, min_num_per_window);
+
+ PopulateHistogram(histogramWindowing, 1, 1, 100);
+ env->SleepForMicroseconds(micros_per_window);
+ ASSERT_EQ(histogramWindowing.num(), 100);
+ ASSERT_EQ(histogramWindowing.min(), 1);
+ ASSERT_EQ(histogramWindowing.max(), 1);
+ ASSERT_EQ(histogramWindowing.Average(), 1);
+
+ PopulateHistogram(histogramWindowing, 2, 2, 100);
+ env->SleepForMicroseconds(micros_per_window);
+ ASSERT_EQ(histogramWindowing.num(), 200);
+ ASSERT_EQ(histogramWindowing.min(), 1);
+ ASSERT_EQ(histogramWindowing.max(), 2);
+ ASSERT_EQ(histogramWindowing.Average(), 1.5);
+
+ PopulateHistogram(histogramWindowing, 3, 3, 100);
+ env->SleepForMicroseconds(micros_per_window);
+ ASSERT_EQ(histogramWindowing.num(), 300);
+ ASSERT_EQ(histogramWindowing.min(), 1);
+ ASSERT_EQ(histogramWindowing.max(), 3);
+ ASSERT_EQ(histogramWindowing.Average(), 2.0);
+
+ // dropping oldest window with value 1, remaining 2 ~ 4
+ PopulateHistogram(histogramWindowing, 4, 4, 100);
+ env->SleepForMicroseconds(micros_per_window);
+ ASSERT_EQ(histogramWindowing.num(), 300);
+ ASSERT_EQ(histogramWindowing.min(), 2);
+ ASSERT_EQ(histogramWindowing.max(), 4);
+ ASSERT_EQ(histogramWindowing.Average(), 3.0);
+
+ // dropping oldest window with value 2, remaining 3 ~ 5
+ PopulateHistogram(histogramWindowing, 5, 5, 100);
+ env->SleepForMicroseconds(micros_per_window);
+ ASSERT_EQ(histogramWindowing.num(), 300);
+ ASSERT_EQ(histogramWindowing.min(), 3);
+ ASSERT_EQ(histogramWindowing.max(), 5);
+ ASSERT_EQ(histogramWindowing.Average(), 4.0);
+}
+
+TEST_F(HistogramTest, HistogramWindowingMerge) {
+ uint64_t num_windows = 3;
+ int micros_per_window = 1000000;
+ uint64_t min_num_per_window = 0;
+
+ HistogramWindowingImpl
+ histogramWindowing(num_windows, micros_per_window, min_num_per_window);
+ HistogramWindowingImpl
+ otherWindowing(num_windows, micros_per_window, min_num_per_window);
+
+ PopulateHistogram(histogramWindowing, 1, 1, 100);
+ PopulateHistogram(otherWindowing, 1, 1, 100);
+ env->SleepForMicroseconds(micros_per_window);
+
+ PopulateHistogram(histogramWindowing, 2, 2, 100);
+ PopulateHistogram(otherWindowing, 2, 2, 100);
+ env->SleepForMicroseconds(micros_per_window);
+
+ PopulateHistogram(histogramWindowing, 3, 3, 100);
+ PopulateHistogram(otherWindowing, 3, 3, 100);
+ env->SleepForMicroseconds(micros_per_window);
+
+ histogramWindowing.Merge(otherWindowing);
+ ASSERT_EQ(histogramWindowing.num(), 600);
+ ASSERT_EQ(histogramWindowing.min(), 1);
+ ASSERT_EQ(histogramWindowing.max(), 3);
+ ASSERT_EQ(histogramWindowing.Average(), 2.0);
+
+ // dropping oldest window with value 1, remaining 2 ~ 4
+ PopulateHistogram(histogramWindowing, 4, 4, 100);
+ env->SleepForMicroseconds(micros_per_window);
+ ASSERT_EQ(histogramWindowing.num(), 500);
+ ASSERT_EQ(histogramWindowing.min(), 2);
+ ASSERT_EQ(histogramWindowing.max(), 4);
+
+ // dropping oldest window with value 2, remaining 3 ~ 5
+ PopulateHistogram(histogramWindowing, 5, 5, 100);
+ env->SleepForMicroseconds(micros_per_window);
+ ASSERT_EQ(histogramWindowing.num(), 400);
+ ASSERT_EQ(histogramWindowing.min(), 3);
+ ASSERT_EQ(histogramWindowing.max(), 5);
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/monitoring/histogram_windowing.cc b/src/rocksdb/monitoring/histogram_windowing.cc
new file mode 100644
index 000000000..253512778
--- /dev/null
+++ b/src/rocksdb/monitoring/histogram_windowing.cc
@@ -0,0 +1,202 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "monitoring/histogram_windowing.h"
+#include "monitoring/histogram.h"
+#include "util/cast_util.h"
+
+#include <algorithm>
+
+namespace ROCKSDB_NAMESPACE {
+
+HistogramWindowingImpl::HistogramWindowingImpl() {
+ env_ = Env::Default();
+ window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
+ Clear();
+}
+
+HistogramWindowingImpl::HistogramWindowingImpl(
+ uint64_t num_windows,
+ uint64_t micros_per_window,
+ uint64_t min_num_per_window) :
+ num_windows_(num_windows),
+ micros_per_window_(micros_per_window),
+ min_num_per_window_(min_num_per_window) {
+ env_ = Env::Default();
+ window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
+ Clear();
+}
+
+HistogramWindowingImpl::~HistogramWindowingImpl() {
+}
+
+void HistogramWindowingImpl::Clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ stats_.Clear();
+ for (size_t i = 0; i < num_windows_; i++) {
+ window_stats_[i].Clear();
+ }
+ current_window_.store(0, std::memory_order_relaxed);
+ last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
+}
+
+bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
+
+// This function is designed to be lock free, as it's in the critical path
+// of any operation.
+// Each individual value is atomic, it is just that some samples can go
+// in the older bucket which is tolerable.
+void HistogramWindowingImpl::Add(uint64_t value){
+ TimerTick();
+
+ // Parent (global) member update
+ stats_.Add(value);
+
+ // Current window update
+ window_stats_[static_cast<size_t>(current_window())].Add(value);
+}
+
+void HistogramWindowingImpl::Merge(const Histogram& other) {
+ if (strcmp(Name(), other.Name()) == 0) {
+ Merge(
+ *static_cast_with_check<const HistogramWindowingImpl, const Histogram>(
+ &other));
+ }
+}
+
+void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ stats_.Merge(other.stats_);
+
+ if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
+ micros_per_window_ != other.micros_per_window_) {
+ return;
+ }
+
+ uint64_t cur_window = current_window();
+ uint64_t other_cur_window = other.current_window();
+ // going backwards for alignment
+ for (unsigned int i = 0;
+ i < std::min(num_windows_, other.num_windows_); i++) {
+ uint64_t window_index =
+ (cur_window + num_windows_ - i) % num_windows_;
+ uint64_t other_window_index =
+ (other_cur_window + other.num_windows_ - i) % other.num_windows_;
+ size_t windex = static_cast<size_t>(window_index);
+ size_t other_windex = static_cast<size_t>(other_window_index);
+
+ window_stats_[windex].Merge(
+ other.window_stats_[other_windex]);
+ }
+}
+
+std::string HistogramWindowingImpl::ToString() const {
+ return stats_.ToString();
+}
+
+double HistogramWindowingImpl::Median() const {
+ return Percentile(50.0);
+}
+
+double HistogramWindowingImpl::Percentile(double p) const {
+ // Retry 3 times in total
+ for (int retry = 0; retry < 3; retry++) {
+ uint64_t start_num = stats_.num();
+ double result = stats_.Percentile(p);
+ // Detect if swap buckets or Clear() was called during calculation
+ if (stats_.num() >= start_num) {
+ return result;
+ }
+ }
+ return 0.0;
+}
+
+double HistogramWindowingImpl::Average() const {
+ return stats_.Average();
+}
+
+double HistogramWindowingImpl::StandardDeviation() const {
+ return stats_.StandardDeviation();
+}
+
+void HistogramWindowingImpl::Data(HistogramData * const data) const {
+ stats_.Data(data);
+}
+
+void HistogramWindowingImpl::TimerTick() {
+ uint64_t curr_time = env_->NowMicros();
+ size_t curr_window_ = static_cast<size_t>(current_window());
+ if (curr_time - last_swap_time() > micros_per_window_ &&
+ window_stats_[curr_window_].num() >= min_num_per_window_) {
+ SwapHistoryBucket();
+ }
+}
+
+void HistogramWindowingImpl::SwapHistoryBucket() {
+ // Threads executing Add() would be competing for this mutex, the first one
+ // who got the metex would take care of the bucket swap, other threads
+ // can skip this.
+ // If mutex is held by Merge() or Clear(), next Add() will take care of the
+ // swap, if needed.
+ if (mutex_.try_lock()) {
+ last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
+
+ uint64_t curr_window = current_window();
+ uint64_t next_window = (curr_window == num_windows_ - 1) ?
+ 0 : curr_window + 1;
+
+ // subtract next buckets from totals and swap to next buckets
+ HistogramStat& stats_to_drop =
+ window_stats_[static_cast<size_t>(next_window)];
+
+ if (!stats_to_drop.Empty()) {
+ for (size_t b = 0; b < stats_.num_buckets_; b++){
+ stats_.buckets_[b].fetch_sub(
+ stats_to_drop.bucket_at(b), std::memory_order_relaxed);
+ }
+
+ if (stats_.min() == stats_to_drop.min()) {
+ uint64_t new_min = std::numeric_limits<uint64_t>::max();
+ for (unsigned int i = 0; i < num_windows_; i++) {
+ if (i != next_window) {
+ uint64_t m = window_stats_[i].min();
+ if (m < new_min) new_min = m;
+ }
+ }
+ stats_.min_.store(new_min, std::memory_order_relaxed);
+ }
+
+ if (stats_.max() == stats_to_drop.max()) {
+ uint64_t new_max = 0;
+ for (unsigned int i = 0; i < num_windows_; i++) {
+ if (i != next_window) {
+ uint64_t m = window_stats_[i].max();
+ if (m > new_max) new_max = m;
+ }
+ }
+ stats_.max_.store(new_max, std::memory_order_relaxed);
+ }
+
+ stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
+ stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
+ stats_.sum_squares_.fetch_sub(
+ stats_to_drop.sum_squares(), std::memory_order_relaxed);
+
+ stats_to_drop.Clear();
+ }
+
+ // advance to next window bucket
+ current_window_.store(next_window, std::memory_order_relaxed);
+
+ mutex_.unlock();
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/histogram_windowing.h b/src/rocksdb/monitoring/histogram_windowing.h
new file mode 100644
index 000000000..72545b07f
--- /dev/null
+++ b/src/rocksdb/monitoring/histogram_windowing.h
@@ -0,0 +1,80 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#include "monitoring/histogram.h"
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class HistogramWindowingImpl : public Histogram
+{
+public:
+ HistogramWindowingImpl();
+ HistogramWindowingImpl(uint64_t num_windows,
+ uint64_t micros_per_window,
+ uint64_t min_num_per_window);
+
+ HistogramWindowingImpl(const HistogramWindowingImpl&) = delete;
+ HistogramWindowingImpl& operator=(const HistogramWindowingImpl&) = delete;
+
+ ~HistogramWindowingImpl();
+
+ virtual void Clear() override;
+ virtual bool Empty() const override;
+ virtual void Add(uint64_t value) override;
+ virtual void Merge(const Histogram& other) override;
+ void Merge(const HistogramWindowingImpl& other);
+
+ virtual std::string ToString() const override;
+ virtual const char* Name() const override { return "HistogramWindowingImpl"; }
+ virtual uint64_t min() const override { return stats_.min(); }
+ virtual uint64_t max() const override { return stats_.max(); }
+ virtual uint64_t num() const override { return stats_.num(); }
+ virtual double Median() const override;
+ virtual double Percentile(double p) const override;
+ virtual double Average() const override;
+ virtual double StandardDeviation() const override;
+ virtual void Data(HistogramData* const data) const override;
+
+private:
+ void TimerTick();
+ void SwapHistoryBucket();
+ inline uint64_t current_window() const {
+ return current_window_.load(std::memory_order_relaxed);
+ }
+ inline uint64_t last_swap_time() const{
+ return last_swap_time_.load(std::memory_order_relaxed);
+ }
+
+ Env* env_;
+ std::mutex mutex_;
+
+ // Aggregated stats over windows_stats_, all the computation is done
+ // upon aggregated values
+ HistogramStat stats_;
+
+ // This is a circular array representing the latest N time-windows.
+ // Each entry stores a time-window of data. Expiration is done
+ // on window-based.
+ std::unique_ptr<HistogramStat[]> window_stats_;
+
+ std::atomic_uint_fast64_t current_window_;
+ std::atomic_uint_fast64_t last_swap_time_;
+
+ // Following parameters are configuable
+ uint64_t num_windows_ = 5;
+ uint64_t micros_per_window_ = 60000000;
+ // By default, don't care about the number of values in current window
+ // when decide whether to swap windows or not.
+ uint64_t min_num_per_window_ = 0;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/in_memory_stats_history.cc b/src/rocksdb/monitoring/in_memory_stats_history.cc
new file mode 100644
index 000000000..dba791e2b
--- /dev/null
+++ b/src/rocksdb/monitoring/in_memory_stats_history.cc
@@ -0,0 +1,49 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "monitoring/in_memory_stats_history.h"
+#include "db/db_impl/db_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+InMemoryStatsHistoryIterator::~InMemoryStatsHistoryIterator() {}
+
+bool InMemoryStatsHistoryIterator::Valid() const { return valid_; }
+
+Status InMemoryStatsHistoryIterator::status() const { return status_; }
+
+// Because of garbage collection, the next stats snapshot may or may not be
+// right after the current one. When reading from DBImpl::stats_history_, this
+// call will be protected by DB Mutex so it will not return partial or
+// corrupted results.
+void InMemoryStatsHistoryIterator::Next() {
+ // increment start_time by 1 to avoid infinite loop
+ AdvanceIteratorByTime(GetStatsTime() + 1, end_time_);
+}
+
+uint64_t InMemoryStatsHistoryIterator::GetStatsTime() const { return time_; }
+
+const std::map<std::string, uint64_t>&
+InMemoryStatsHistoryIterator::GetStatsMap() const {
+ return stats_map_;
+}
+
+// advance the iterator to the next time between [start_time, end_time)
+// if success, update time_ and stats_map_ with new_time and stats_map
+void InMemoryStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time,
+ uint64_t end_time) {
+ // try to find next entry in stats_history_ map
+ if (db_impl_ != nullptr) {
+ valid_ =
+ db_impl_->FindStatsByTime(start_time, end_time, &time_, &stats_map_);
+ } else {
+ valid_ = false;
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/in_memory_stats_history.h b/src/rocksdb/monitoring/in_memory_stats_history.h
new file mode 100644
index 000000000..3be864fe2
--- /dev/null
+++ b/src/rocksdb/monitoring/in_memory_stats_history.h
@@ -0,0 +1,74 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#include "rocksdb/stats_history.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// InMemoryStatsHistoryIterator can be used to access stats history that was
+// stored by an in-memory two level std::map(DBImpl::stats_history_). It keeps
+// a copy of the stats snapshot (in stats_map_) that is currently being pointed
+// to, which allows the iterator to access the stats snapshot even when
+// the background garbage collecting thread purges it from the source of truth
+// (`DBImpl::stats_history_`). In that case, the iterator will continue to be
+// valid until a call to `Next()` returns no result and invalidates it. In
+// some extreme cases, the iterator may also return fragmented segments of
+// stats snapshots due to long gaps between `Next()` calls and interleaved
+// garbage collection.
+class InMemoryStatsHistoryIterator final : public StatsHistoryIterator {
+ public:
+ // Setup InMemoryStatsHistoryIterator to return stats snapshots between
+ // seconds timestamps [start_time, end_time)
+ InMemoryStatsHistoryIterator(uint64_t start_time, uint64_t end_time,
+ DBImpl* db_impl)
+ : start_time_(start_time),
+ end_time_(end_time),
+ valid_(true),
+ db_impl_(db_impl) {
+ AdvanceIteratorByTime(start_time_, end_time_);
+ }
+ // no copying allowed
+ InMemoryStatsHistoryIterator(const InMemoryStatsHistoryIterator&) = delete;
+ void operator=(const InMemoryStatsHistoryIterator&) = delete;
+ InMemoryStatsHistoryIterator(InMemoryStatsHistoryIterator&&) = delete;
+ InMemoryStatsHistoryIterator& operator=(InMemoryStatsHistoryIterator&&) =
+ delete;
+
+ ~InMemoryStatsHistoryIterator() override;
+ bool Valid() const override;
+ Status status() const override;
+
+ // Move to the next stats snapshot currently available
+ // This function may invalidate the iterator
+ // REQUIRES: Valid()
+ void Next() override;
+
+ // REQUIRES: Valid()
+ uint64_t GetStatsTime() const override;
+
+ // This function is idempotent
+ // REQUIRES: Valid()
+ const std::map<std::string, uint64_t>& GetStatsMap() const override;
+
+ private:
+ // advance the iterator to the next stats history record with timestamp
+ // between [start_time, end_time)
+ void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time);
+
+ uint64_t time_;
+ uint64_t start_time_;
+ uint64_t end_time_;
+ std::map<std::string, uint64_t> stats_map_;
+ Status status_;
+ bool valid_;
+ DBImpl* db_impl_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/instrumented_mutex.cc b/src/rocksdb/monitoring/instrumented_mutex.cc
new file mode 100644
index 000000000..8b06c60ff
--- /dev/null
+++ b/src/rocksdb/monitoring/instrumented_mutex.cc
@@ -0,0 +1,69 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "monitoring/instrumented_mutex.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/thread_status_util.h"
+#include "test_util/sync_point.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace {
+Statistics* stats_for_report(Env* env, Statistics* stats) {
+ if (env != nullptr && stats != nullptr &&
+ stats->get_stats_level() > kExceptTimeForMutex) {
+ return stats;
+ } else {
+ return nullptr;
+ }
+}
+} // namespace
+
+void InstrumentedMutex::Lock() {
+ PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(
+ db_mutex_lock_nanos, stats_code_ == DB_MUTEX_WAIT_MICROS,
+ stats_for_report(env_, stats_), stats_code_);
+ LockInternal();
+}
+
+void InstrumentedMutex::LockInternal() {
+#ifndef NDEBUG
+ ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
+#endif
+ mutex_.Lock();
+}
+
+void InstrumentedCondVar::Wait() {
+ PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(
+ db_condition_wait_nanos, stats_code_ == DB_MUTEX_WAIT_MICROS,
+ stats_for_report(env_, stats_), stats_code_);
+ WaitInternal();
+}
+
+void InstrumentedCondVar::WaitInternal() {
+#ifndef NDEBUG
+ ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
+#endif
+ cond_.Wait();
+}
+
+bool InstrumentedCondVar::TimedWait(uint64_t abs_time_us) {
+ PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(
+ db_condition_wait_nanos, stats_code_ == DB_MUTEX_WAIT_MICROS,
+ stats_for_report(env_, stats_), stats_code_);
+ return TimedWaitInternal(abs_time_us);
+}
+
+bool InstrumentedCondVar::TimedWaitInternal(uint64_t abs_time_us) {
+#ifndef NDEBUG
+ ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT);
+#endif
+
+ TEST_SYNC_POINT_CALLBACK("InstrumentedCondVar::TimedWaitInternal",
+ &abs_time_us);
+
+ return cond_.TimedWait(abs_time_us);
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/instrumented_mutex.h b/src/rocksdb/monitoring/instrumented_mutex.h
new file mode 100644
index 000000000..50c1f29c8
--- /dev/null
+++ b/src/rocksdb/monitoring/instrumented_mutex.h
@@ -0,0 +1,98 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include "monitoring/statistics.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/thread_status.h"
+#include "util/stop_watch.h"
+
+namespace ROCKSDB_NAMESPACE {
+class InstrumentedCondVar;
+
+// A wrapper class for port::Mutex that provides additional layer
+// for collecting stats and instrumentation.
+class InstrumentedMutex {
+ public:
+ explicit InstrumentedMutex(bool adaptive = false)
+ : mutex_(adaptive), stats_(nullptr), env_(nullptr),
+ stats_code_(0) {}
+
+ InstrumentedMutex(
+ Statistics* stats, Env* env,
+ int stats_code, bool adaptive = false)
+ : mutex_(adaptive), stats_(stats), env_(env),
+ stats_code_(stats_code) {}
+
+ void Lock();
+
+ void Unlock() {
+ mutex_.Unlock();
+ }
+
+ void AssertHeld() {
+ mutex_.AssertHeld();
+ }
+
+ private:
+ void LockInternal();
+ friend class InstrumentedCondVar;
+ port::Mutex mutex_;
+ Statistics* stats_;
+ Env* env_;
+ int stats_code_;
+};
+
+// A wrapper class for port::Mutex that provides additional layer
+// for collecting stats and instrumentation.
+class InstrumentedMutexLock {
+ public:
+ explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) {
+ mutex_->Lock();
+ }
+
+ ~InstrumentedMutexLock() {
+ mutex_->Unlock();
+ }
+
+ private:
+ InstrumentedMutex* const mutex_;
+ InstrumentedMutexLock(const InstrumentedMutexLock&) = delete;
+ void operator=(const InstrumentedMutexLock&) = delete;
+};
+
+class InstrumentedCondVar {
+ public:
+ explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex)
+ : cond_(&(instrumented_mutex->mutex_)),
+ stats_(instrumented_mutex->stats_),
+ env_(instrumented_mutex->env_),
+ stats_code_(instrumented_mutex->stats_code_) {}
+
+ void Wait();
+
+ bool TimedWait(uint64_t abs_time_us);
+
+ void Signal() {
+ cond_.Signal();
+ }
+
+ void SignalAll() {
+ cond_.SignalAll();
+ }
+
+ private:
+ void WaitInternal();
+ bool TimedWaitInternal(uint64_t abs_time_us);
+ port::CondVar cond_;
+ Statistics* stats_;
+ Env* env_;
+ int stats_code_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/iostats_context.cc b/src/rocksdb/monitoring/iostats_context.cc
new file mode 100644
index 000000000..2960f05e8
--- /dev/null
+++ b/src/rocksdb/monitoring/iostats_context.cc
@@ -0,0 +1,62 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <sstream>
+#include "monitoring/iostats_context_imp.h"
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
+__thread IOStatsContext iostats_context;
+#endif
+
+IOStatsContext* get_iostats_context() {
+#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
+ return &iostats_context;
+#else
+ return nullptr;
+#endif
+}
+
+void IOStatsContext::Reset() {
+ thread_pool_id = Env::Priority::TOTAL;
+ bytes_read = 0;
+ bytes_written = 0;
+ open_nanos = 0;
+ allocate_nanos = 0;
+ write_nanos = 0;
+ read_nanos = 0;
+ range_sync_nanos = 0;
+ prepare_write_nanos = 0;
+ fsync_nanos = 0;
+ logger_nanos = 0;
+}
+
+#define IOSTATS_CONTEXT_OUTPUT(counter) \
+ if (!exclude_zero_counters || counter > 0) { \
+ ss << #counter << " = " << counter << ", "; \
+ }
+
+std::string IOStatsContext::ToString(bool exclude_zero_counters) const {
+ std::ostringstream ss;
+ IOSTATS_CONTEXT_OUTPUT(thread_pool_id);
+ IOSTATS_CONTEXT_OUTPUT(bytes_read);
+ IOSTATS_CONTEXT_OUTPUT(bytes_written);
+ IOSTATS_CONTEXT_OUTPUT(open_nanos);
+ IOSTATS_CONTEXT_OUTPUT(allocate_nanos);
+ IOSTATS_CONTEXT_OUTPUT(write_nanos);
+ IOSTATS_CONTEXT_OUTPUT(read_nanos);
+ IOSTATS_CONTEXT_OUTPUT(range_sync_nanos);
+ IOSTATS_CONTEXT_OUTPUT(fsync_nanos);
+ IOSTATS_CONTEXT_OUTPUT(prepare_write_nanos);
+ IOSTATS_CONTEXT_OUTPUT(logger_nanos);
+
+ std::string str = ss.str();
+ str.erase(str.find_last_not_of(", ") + 1);
+ return str;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/iostats_context_imp.h b/src/rocksdb/monitoring/iostats_context_imp.h
new file mode 100644
index 000000000..a7f095d6e
--- /dev/null
+++ b/src/rocksdb/monitoring/iostats_context_imp.h
@@ -0,0 +1,60 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include "monitoring/perf_step_timer.h"
+#include "rocksdb/iostats_context.h"
+
+#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
+namespace ROCKSDB_NAMESPACE {
+extern __thread IOStatsContext iostats_context;
+} // namespace ROCKSDB_NAMESPACE
+
+// increment a specific counter by the specified value
+#define IOSTATS_ADD(metric, value) (iostats_context.metric += value)
+
+// Increase metric value only when it is positive
+#define IOSTATS_ADD_IF_POSITIVE(metric, value) \
+ if (value > 0) { IOSTATS_ADD(metric, value); }
+
+// reset a specific counter to zero
+#define IOSTATS_RESET(metric) (iostats_context.metric = 0)
+
+// reset all counters to zero
+#define IOSTATS_RESET_ALL() (iostats_context.Reset())
+
+#define IOSTATS_SET_THREAD_POOL_ID(value) \
+ (iostats_context.thread_pool_id = value)
+
+#define IOSTATS_THREAD_POOL_ID() (iostats_context.thread_pool_id)
+
+#define IOSTATS(metric) (iostats_context.metric)
+
+// Declare and set start time of the timer
+#define IOSTATS_TIMER_GUARD(metric) \
+ PerfStepTimer iostats_step_timer_##metric(&(iostats_context.metric)); \
+ iostats_step_timer_##metric.Start();
+
+// Declare and set start time of the timer
+#define IOSTATS_CPU_TIMER_GUARD(metric, env) \
+ PerfStepTimer iostats_step_timer_##metric( \
+ &(iostats_context.metric), env, true, \
+ PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
+ iostats_step_timer_##metric.Start();
+
+#else // ROCKSDB_SUPPORT_THREAD_LOCAL
+
+#define IOSTATS_ADD(metric, value)
+#define IOSTATS_ADD_IF_POSITIVE(metric, value)
+#define IOSTATS_RESET(metric)
+#define IOSTATS_RESET_ALL()
+#define IOSTATS_SET_THREAD_POOL_ID(value)
+#define IOSTATS_THREAD_POOL_ID()
+#define IOSTATS(metric) 0
+
+#define IOSTATS_TIMER_GUARD(metric)
+#define IOSTATS_CPU_TIMER_GUARD(metric, env) static_cast<void>(env)
+
+#endif // ROCKSDB_SUPPORT_THREAD_LOCAL
diff --git a/src/rocksdb/monitoring/iostats_context_test.cc b/src/rocksdb/monitoring/iostats_context_test.cc
new file mode 100644
index 000000000..49f6fc058
--- /dev/null
+++ b/src/rocksdb/monitoring/iostats_context_test.cc
@@ -0,0 +1,29 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/iostats_context.h"
+#include "test_util/testharness.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+TEST(IOStatsContextTest, ToString) {
+ get_iostats_context()->Reset();
+ get_iostats_context()->bytes_read = 12345;
+
+ std::string zero_included = get_iostats_context()->ToString();
+ ASSERT_NE(std::string::npos, zero_included.find("= 0"));
+ ASSERT_NE(std::string::npos, zero_included.find("= 12345"));
+
+ std::string zero_excluded = get_iostats_context()->ToString(true);
+ ASSERT_EQ(std::string::npos, zero_excluded.find("= 0"));
+ ASSERT_NE(std::string::npos, zero_excluded.find("= 12345"));
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/monitoring/perf_context.cc b/src/rocksdb/monitoring/perf_context.cc
new file mode 100644
index 000000000..446a9b766
--- /dev/null
+++ b/src/rocksdb/monitoring/perf_context.cc
@@ -0,0 +1,559 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#include <sstream>
+#include "monitoring/perf_context_imp.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
+PerfContext perf_context;
+#else
+#if defined(OS_SOLARIS)
+__thread PerfContext perf_context_;
+#else
+thread_local PerfContext perf_context;
+#endif
+#endif
+
+PerfContext* get_perf_context() {
+#if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
+ return &perf_context;
+#else
+#if defined(OS_SOLARIS)
+ return &perf_context_;
+#else
+ return &perf_context;
+#endif
+#endif
+}
+
+PerfContext::~PerfContext() {
+#if !defined(NPERF_CONTEXT) && defined(ROCKSDB_SUPPORT_THREAD_LOCAL) && !defined(OS_SOLARIS)
+ ClearPerLevelPerfContext();
+#endif
+}
+
+PerfContext::PerfContext(const PerfContext& other) {
+#ifndef NPERF_CONTEXT
+ user_key_comparison_count = other.user_key_comparison_count;
+ block_cache_hit_count = other.block_cache_hit_count;
+ block_read_count = other.block_read_count;
+ block_read_byte = other.block_read_byte;
+ block_read_time = other.block_read_time;
+ block_cache_index_hit_count = other.block_cache_index_hit_count;
+ index_block_read_count = other.index_block_read_count;
+ block_cache_filter_hit_count = other.block_cache_filter_hit_count;
+ filter_block_read_count = other.filter_block_read_count;
+ compression_dict_block_read_count = other.compression_dict_block_read_count;
+ block_checksum_time = other.block_checksum_time;
+ block_decompress_time = other.block_decompress_time;
+ get_read_bytes = other.get_read_bytes;
+ multiget_read_bytes = other.multiget_read_bytes;
+ iter_read_bytes = other.iter_read_bytes;
+ internal_key_skipped_count = other.internal_key_skipped_count;
+ internal_delete_skipped_count = other.internal_delete_skipped_count;
+ internal_recent_skipped_count = other.internal_recent_skipped_count;
+ internal_merge_count = other.internal_merge_count;
+ write_wal_time = other.write_wal_time;
+ get_snapshot_time = other.get_snapshot_time;
+ get_from_memtable_time = other.get_from_memtable_time;
+ get_from_memtable_count = other.get_from_memtable_count;
+ get_post_process_time = other.get_post_process_time;
+ get_from_output_files_time = other.get_from_output_files_time;
+ seek_on_memtable_time = other.seek_on_memtable_time;
+ seek_on_memtable_count = other.seek_on_memtable_count;
+ next_on_memtable_count = other.next_on_memtable_count;
+ prev_on_memtable_count = other.prev_on_memtable_count;
+ seek_child_seek_time = other.seek_child_seek_time;
+ seek_child_seek_count = other.seek_child_seek_count;
+ seek_min_heap_time = other.seek_min_heap_time;
+ seek_internal_seek_time = other.seek_internal_seek_time;
+ find_next_user_entry_time = other.find_next_user_entry_time;
+ write_pre_and_post_process_time = other.write_pre_and_post_process_time;
+ write_memtable_time = other.write_memtable_time;
+ write_delay_time = other.write_delay_time;
+ write_thread_wait_nanos = other.write_thread_wait_nanos;
+ write_scheduling_flushes_compactions_time =
+ other.write_scheduling_flushes_compactions_time;
+ db_mutex_lock_nanos = other.db_mutex_lock_nanos;
+ db_condition_wait_nanos = other.db_condition_wait_nanos;
+ merge_operator_time_nanos = other.merge_operator_time_nanos;
+ read_index_block_nanos = other.read_index_block_nanos;
+ read_filter_block_nanos = other.read_filter_block_nanos;
+ new_table_block_iter_nanos = other.new_table_block_iter_nanos;
+ new_table_iterator_nanos = other.new_table_iterator_nanos;
+ block_seek_nanos = other.block_seek_nanos;
+ find_table_nanos = other.find_table_nanos;
+ bloom_memtable_hit_count = other.bloom_memtable_hit_count;
+ bloom_memtable_miss_count = other.bloom_memtable_miss_count;
+ bloom_sst_hit_count = other.bloom_sst_hit_count;
+ bloom_sst_miss_count = other.bloom_sst_miss_count;
+ key_lock_wait_time = other.key_lock_wait_time;
+ key_lock_wait_count = other.key_lock_wait_count;
+
+ env_new_sequential_file_nanos = other.env_new_sequential_file_nanos;
+ env_new_random_access_file_nanos = other.env_new_random_access_file_nanos;
+ env_new_writable_file_nanos = other.env_new_writable_file_nanos;
+ env_reuse_writable_file_nanos = other.env_reuse_writable_file_nanos;
+ env_new_random_rw_file_nanos = other.env_new_random_rw_file_nanos;
+ env_new_directory_nanos = other.env_new_directory_nanos;
+ env_file_exists_nanos = other.env_file_exists_nanos;
+ env_get_children_nanos = other.env_get_children_nanos;
+ env_get_children_file_attributes_nanos =
+ other.env_get_children_file_attributes_nanos;
+ env_delete_file_nanos = other.env_delete_file_nanos;
+ env_create_dir_nanos = other.env_create_dir_nanos;
+ env_create_dir_if_missing_nanos = other.env_create_dir_if_missing_nanos;
+ env_delete_dir_nanos = other.env_delete_dir_nanos;
+ env_get_file_size_nanos = other.env_get_file_size_nanos;
+ env_get_file_modification_time_nanos =
+ other.env_get_file_modification_time_nanos;
+ env_rename_file_nanos = other.env_rename_file_nanos;
+ env_link_file_nanos = other.env_link_file_nanos;
+ env_lock_file_nanos = other.env_lock_file_nanos;
+ env_unlock_file_nanos = other.env_unlock_file_nanos;
+ env_new_logger_nanos = other.env_new_logger_nanos;
+ get_cpu_nanos = other.get_cpu_nanos;
+ iter_next_cpu_nanos = other.iter_next_cpu_nanos;
+ iter_prev_cpu_nanos = other.iter_prev_cpu_nanos;
+ iter_seek_cpu_nanos = other.iter_seek_cpu_nanos;
+ if (per_level_perf_context_enabled && level_to_perf_context != nullptr) {
+ ClearPerLevelPerfContext();
+ }
+ if (other.level_to_perf_context != nullptr) {
+ level_to_perf_context = new std::map<uint32_t, PerfContextByLevel>();
+ *level_to_perf_context = *other.level_to_perf_context;
+ }
+ per_level_perf_context_enabled = other.per_level_perf_context_enabled;
+#endif
+}
+
+PerfContext::PerfContext(PerfContext&& other) noexcept {
+#ifndef NPERF_CONTEXT
+ user_key_comparison_count = other.user_key_comparison_count;
+ block_cache_hit_count = other.block_cache_hit_count;
+ block_read_count = other.block_read_count;
+ block_read_byte = other.block_read_byte;
+ block_read_time = other.block_read_time;
+ block_cache_index_hit_count = other.block_cache_index_hit_count;
+ index_block_read_count = other.index_block_read_count;
+ block_cache_filter_hit_count = other.block_cache_filter_hit_count;
+ filter_block_read_count = other.filter_block_read_count;
+ compression_dict_block_read_count = other.compression_dict_block_read_count;
+ block_checksum_time = other.block_checksum_time;
+ block_decompress_time = other.block_decompress_time;
+ get_read_bytes = other.get_read_bytes;
+ multiget_read_bytes = other.multiget_read_bytes;
+ iter_read_bytes = other.iter_read_bytes;
+ internal_key_skipped_count = other.internal_key_skipped_count;
+ internal_delete_skipped_count = other.internal_delete_skipped_count;
+ internal_recent_skipped_count = other.internal_recent_skipped_count;
+ internal_merge_count = other.internal_merge_count;
+ write_wal_time = other.write_wal_time;
+ get_snapshot_time = other.get_snapshot_time;
+ get_from_memtable_time = other.get_from_memtable_time;
+ get_from_memtable_count = other.get_from_memtable_count;
+ get_post_process_time = other.get_post_process_time;
+ get_from_output_files_time = other.get_from_output_files_time;
+ seek_on_memtable_time = other.seek_on_memtable_time;
+ seek_on_memtable_count = other.seek_on_memtable_count;
+ next_on_memtable_count = other.next_on_memtable_count;
+ prev_on_memtable_count = other.prev_on_memtable_count;
+ seek_child_seek_time = other.seek_child_seek_time;
+ seek_child_seek_count = other.seek_child_seek_count;
+ seek_min_heap_time = other.seek_min_heap_time;
+ seek_internal_seek_time = other.seek_internal_seek_time;
+ find_next_user_entry_time = other.find_next_user_entry_time;
+ write_pre_and_post_process_time = other.write_pre_and_post_process_time;
+ write_memtable_time = other.write_memtable_time;
+ write_delay_time = other.write_delay_time;
+ write_thread_wait_nanos = other.write_thread_wait_nanos;
+ write_scheduling_flushes_compactions_time =
+ other.write_scheduling_flushes_compactions_time;
+ db_mutex_lock_nanos = other.db_mutex_lock_nanos;
+ db_condition_wait_nanos = other.db_condition_wait_nanos;
+ merge_operator_time_nanos = other.merge_operator_time_nanos;
+ read_index_block_nanos = other.read_index_block_nanos;
+ read_filter_block_nanos = other.read_filter_block_nanos;
+ new_table_block_iter_nanos = other.new_table_block_iter_nanos;
+ new_table_iterator_nanos = other.new_table_iterator_nanos;
+ block_seek_nanos = other.block_seek_nanos;
+ find_table_nanos = other.find_table_nanos;
+ bloom_memtable_hit_count = other.bloom_memtable_hit_count;
+ bloom_memtable_miss_count = other.bloom_memtable_miss_count;
+ bloom_sst_hit_count = other.bloom_sst_hit_count;
+ bloom_sst_miss_count = other.bloom_sst_miss_count;
+ key_lock_wait_time = other.key_lock_wait_time;
+ key_lock_wait_count = other.key_lock_wait_count;
+
+ env_new_sequential_file_nanos = other.env_new_sequential_file_nanos;
+ env_new_random_access_file_nanos = other.env_new_random_access_file_nanos;
+ env_new_writable_file_nanos = other.env_new_writable_file_nanos;
+ env_reuse_writable_file_nanos = other.env_reuse_writable_file_nanos;
+ env_new_random_rw_file_nanos = other.env_new_random_rw_file_nanos;
+ env_new_directory_nanos = other.env_new_directory_nanos;
+ env_file_exists_nanos = other.env_file_exists_nanos;
+ env_get_children_nanos = other.env_get_children_nanos;
+ env_get_children_file_attributes_nanos =
+ other.env_get_children_file_attributes_nanos;
+ env_delete_file_nanos = other.env_delete_file_nanos;
+ env_create_dir_nanos = other.env_create_dir_nanos;
+ env_create_dir_if_missing_nanos = other.env_create_dir_if_missing_nanos;
+ env_delete_dir_nanos = other.env_delete_dir_nanos;
+ env_get_file_size_nanos = other.env_get_file_size_nanos;
+ env_get_file_modification_time_nanos =
+ other.env_get_file_modification_time_nanos;
+ env_rename_file_nanos = other.env_rename_file_nanos;
+ env_link_file_nanos = other.env_link_file_nanos;
+ env_lock_file_nanos = other.env_lock_file_nanos;
+ env_unlock_file_nanos = other.env_unlock_file_nanos;
+ env_new_logger_nanos = other.env_new_logger_nanos;
+ get_cpu_nanos = other.get_cpu_nanos;
+ iter_next_cpu_nanos = other.iter_next_cpu_nanos;
+ iter_prev_cpu_nanos = other.iter_prev_cpu_nanos;
+ iter_seek_cpu_nanos = other.iter_seek_cpu_nanos;
+ if (per_level_perf_context_enabled && level_to_perf_context != nullptr) {
+ ClearPerLevelPerfContext();
+ }
+ if (other.level_to_perf_context != nullptr) {
+ level_to_perf_context = other.level_to_perf_context;
+ other.level_to_perf_context = nullptr;
+ }
+ per_level_perf_context_enabled = other.per_level_perf_context_enabled;
+#endif
+}
+
+// TODO(Zhongyi): reduce code duplication between copy constructor and
+// assignment operator
+PerfContext& PerfContext::operator=(const PerfContext& other) {
+#ifndef NPERF_CONTEXT
+ user_key_comparison_count = other.user_key_comparison_count;
+ block_cache_hit_count = other.block_cache_hit_count;
+ block_read_count = other.block_read_count;
+ block_read_byte = other.block_read_byte;
+ block_read_time = other.block_read_time;
+ block_cache_index_hit_count = other.block_cache_index_hit_count;
+ index_block_read_count = other.index_block_read_count;
+ block_cache_filter_hit_count = other.block_cache_filter_hit_count;
+ filter_block_read_count = other.filter_block_read_count;
+ compression_dict_block_read_count = other.compression_dict_block_read_count;
+ block_checksum_time = other.block_checksum_time;
+ block_decompress_time = other.block_decompress_time;
+ get_read_bytes = other.get_read_bytes;
+ multiget_read_bytes = other.multiget_read_bytes;
+ iter_read_bytes = other.iter_read_bytes;
+ internal_key_skipped_count = other.internal_key_skipped_count;
+ internal_delete_skipped_count = other.internal_delete_skipped_count;
+ internal_recent_skipped_count = other.internal_recent_skipped_count;
+ internal_merge_count = other.internal_merge_count;
+ write_wal_time = other.write_wal_time;
+ get_snapshot_time = other.get_snapshot_time;
+ get_from_memtable_time = other.get_from_memtable_time;
+ get_from_memtable_count = other.get_from_memtable_count;
+ get_post_process_time = other.get_post_process_time;
+ get_from_output_files_time = other.get_from_output_files_time;
+ seek_on_memtable_time = other.seek_on_memtable_time;
+ seek_on_memtable_count = other.seek_on_memtable_count;
+ next_on_memtable_count = other.next_on_memtable_count;
+ prev_on_memtable_count = other.prev_on_memtable_count;
+ seek_child_seek_time = other.seek_child_seek_time;
+ seek_child_seek_count = other.seek_child_seek_count;
+ seek_min_heap_time = other.seek_min_heap_time;
+ seek_internal_seek_time = other.seek_internal_seek_time;
+ find_next_user_entry_time = other.find_next_user_entry_time;
+ write_pre_and_post_process_time = other.write_pre_and_post_process_time;
+ write_memtable_time = other.write_memtable_time;
+ write_delay_time = other.write_delay_time;
+ write_thread_wait_nanos = other.write_thread_wait_nanos;
+ write_scheduling_flushes_compactions_time =
+ other.write_scheduling_flushes_compactions_time;
+ db_mutex_lock_nanos = other.db_mutex_lock_nanos;
+ db_condition_wait_nanos = other.db_condition_wait_nanos;
+ merge_operator_time_nanos = other.merge_operator_time_nanos;
+ read_index_block_nanos = other.read_index_block_nanos;
+ read_filter_block_nanos = other.read_filter_block_nanos;
+ new_table_block_iter_nanos = other.new_table_block_iter_nanos;
+ new_table_iterator_nanos = other.new_table_iterator_nanos;
+ block_seek_nanos = other.block_seek_nanos;
+ find_table_nanos = other.find_table_nanos;
+ bloom_memtable_hit_count = other.bloom_memtable_hit_count;
+ bloom_memtable_miss_count = other.bloom_memtable_miss_count;
+ bloom_sst_hit_count = other.bloom_sst_hit_count;
+ bloom_sst_miss_count = other.bloom_sst_miss_count;
+ key_lock_wait_time = other.key_lock_wait_time;
+ key_lock_wait_count = other.key_lock_wait_count;
+
+ env_new_sequential_file_nanos = other.env_new_sequential_file_nanos;
+ env_new_random_access_file_nanos = other.env_new_random_access_file_nanos;
+ env_new_writable_file_nanos = other.env_new_writable_file_nanos;
+ env_reuse_writable_file_nanos = other.env_reuse_writable_file_nanos;
+ env_new_random_rw_file_nanos = other.env_new_random_rw_file_nanos;
+ env_new_directory_nanos = other.env_new_directory_nanos;
+ env_file_exists_nanos = other.env_file_exists_nanos;
+ env_get_children_nanos = other.env_get_children_nanos;
+ env_get_children_file_attributes_nanos =
+ other.env_get_children_file_attributes_nanos;
+ env_delete_file_nanos = other.env_delete_file_nanos;
+ env_create_dir_nanos = other.env_create_dir_nanos;
+ env_create_dir_if_missing_nanos = other.env_create_dir_if_missing_nanos;
+ env_delete_dir_nanos = other.env_delete_dir_nanos;
+ env_get_file_size_nanos = other.env_get_file_size_nanos;
+ env_get_file_modification_time_nanos =
+ other.env_get_file_modification_time_nanos;
+ env_rename_file_nanos = other.env_rename_file_nanos;
+ env_link_file_nanos = other.env_link_file_nanos;
+ env_lock_file_nanos = other.env_lock_file_nanos;
+ env_unlock_file_nanos = other.env_unlock_file_nanos;
+ env_new_logger_nanos = other.env_new_logger_nanos;
+ get_cpu_nanos = other.get_cpu_nanos;
+ iter_next_cpu_nanos = other.iter_next_cpu_nanos;
+ iter_prev_cpu_nanos = other.iter_prev_cpu_nanos;
+ iter_seek_cpu_nanos = other.iter_seek_cpu_nanos;
+ if (per_level_perf_context_enabled && level_to_perf_context != nullptr) {
+ ClearPerLevelPerfContext();
+ }
+ if (other.level_to_perf_context != nullptr) {
+ level_to_perf_context = new std::map<uint32_t, PerfContextByLevel>();
+ *level_to_perf_context = *other.level_to_perf_context;
+ }
+ per_level_perf_context_enabled = other.per_level_perf_context_enabled;
+#endif
+ return *this;
+}
+
+void PerfContext::Reset() {
+#ifndef NPERF_CONTEXT
+ user_key_comparison_count = 0;
+ block_cache_hit_count = 0;
+ block_read_count = 0;
+ block_read_byte = 0;
+ block_read_time = 0;
+ block_cache_index_hit_count = 0;
+ index_block_read_count = 0;
+ block_cache_filter_hit_count = 0;
+ filter_block_read_count = 0;
+ compression_dict_block_read_count = 0;
+ block_checksum_time = 0;
+ block_decompress_time = 0;
+ get_read_bytes = 0;
+ multiget_read_bytes = 0;
+ iter_read_bytes = 0;
+ internal_key_skipped_count = 0;
+ internal_delete_skipped_count = 0;
+ internal_recent_skipped_count = 0;
+ internal_merge_count = 0;
+ write_wal_time = 0;
+
+ get_snapshot_time = 0;
+ get_from_memtable_time = 0;
+ get_from_memtable_count = 0;
+ get_post_process_time = 0;
+ get_from_output_files_time = 0;
+ seek_on_memtable_time = 0;
+ seek_on_memtable_count = 0;
+ next_on_memtable_count = 0;
+ prev_on_memtable_count = 0;
+ seek_child_seek_time = 0;
+ seek_child_seek_count = 0;
+ seek_min_heap_time = 0;
+ seek_internal_seek_time = 0;
+ find_next_user_entry_time = 0;
+ write_pre_and_post_process_time = 0;
+ write_memtable_time = 0;
+ write_delay_time = 0;
+ write_thread_wait_nanos = 0;
+ write_scheduling_flushes_compactions_time = 0;
+ db_mutex_lock_nanos = 0;
+ db_condition_wait_nanos = 0;
+ merge_operator_time_nanos = 0;
+ read_index_block_nanos = 0;
+ read_filter_block_nanos = 0;
+ new_table_block_iter_nanos = 0;
+ new_table_iterator_nanos = 0;
+ block_seek_nanos = 0;
+ find_table_nanos = 0;
+ bloom_memtable_hit_count = 0;
+ bloom_memtable_miss_count = 0;
+ bloom_sst_hit_count = 0;
+ bloom_sst_miss_count = 0;
+ key_lock_wait_time = 0;
+ key_lock_wait_count = 0;
+
+ env_new_sequential_file_nanos = 0;
+ env_new_random_access_file_nanos = 0;
+ env_new_writable_file_nanos = 0;
+ env_reuse_writable_file_nanos = 0;
+ env_new_random_rw_file_nanos = 0;
+ env_new_directory_nanos = 0;
+ env_file_exists_nanos = 0;
+ env_get_children_nanos = 0;
+ env_get_children_file_attributes_nanos = 0;
+ env_delete_file_nanos = 0;
+ env_create_dir_nanos = 0;
+ env_create_dir_if_missing_nanos = 0;
+ env_delete_dir_nanos = 0;
+ env_get_file_size_nanos = 0;
+ env_get_file_modification_time_nanos = 0;
+ env_rename_file_nanos = 0;
+ env_link_file_nanos = 0;
+ env_lock_file_nanos = 0;
+ env_unlock_file_nanos = 0;
+ env_new_logger_nanos = 0;
+ get_cpu_nanos = 0;
+ iter_next_cpu_nanos = 0;
+ iter_prev_cpu_nanos = 0;
+ iter_seek_cpu_nanos = 0;
+ if (per_level_perf_context_enabled && level_to_perf_context) {
+ for (auto& kv : *level_to_perf_context) {
+ kv.second.Reset();
+ }
+ }
+#endif
+}
+
+#define PERF_CONTEXT_OUTPUT(counter) \
+ if (!exclude_zero_counters || (counter > 0)) { \
+ ss << #counter << " = " << counter << ", "; \
+ }
+
+#define PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(counter) \
+ if (per_level_perf_context_enabled && \
+ level_to_perf_context) { \
+ ss << #counter << " = "; \
+ for (auto& kv : *level_to_perf_context) { \
+ if (!exclude_zero_counters || (kv.second.counter > 0)) { \
+ ss << kv.second.counter << "@level" << kv.first << ", "; \
+ } \
+ } \
+ }
+
+void PerfContextByLevel::Reset() {
+#ifndef NPERF_CONTEXT
+ bloom_filter_useful = 0;
+ bloom_filter_full_positive = 0;
+ bloom_filter_full_true_positive = 0;
+ block_cache_hit_count = 0;
+ block_cache_miss_count = 0;
+#endif
+}
+
+std::string PerfContext::ToString(bool exclude_zero_counters) const {
+#ifdef NPERF_CONTEXT
+ return "";
+#else
+ std::ostringstream ss;
+ PERF_CONTEXT_OUTPUT(user_key_comparison_count);
+ PERF_CONTEXT_OUTPUT(block_cache_hit_count);
+ PERF_CONTEXT_OUTPUT(block_read_count);
+ PERF_CONTEXT_OUTPUT(block_read_byte);
+ PERF_CONTEXT_OUTPUT(block_read_time);
+ PERF_CONTEXT_OUTPUT(block_cache_index_hit_count);
+ PERF_CONTEXT_OUTPUT(index_block_read_count);
+ PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count);
+ PERF_CONTEXT_OUTPUT(filter_block_read_count);
+ PERF_CONTEXT_OUTPUT(compression_dict_block_read_count);
+ PERF_CONTEXT_OUTPUT(block_checksum_time);
+ PERF_CONTEXT_OUTPUT(block_decompress_time);
+ PERF_CONTEXT_OUTPUT(get_read_bytes);
+ PERF_CONTEXT_OUTPUT(multiget_read_bytes);
+ PERF_CONTEXT_OUTPUT(iter_read_bytes);
+ PERF_CONTEXT_OUTPUT(internal_key_skipped_count);
+ PERF_CONTEXT_OUTPUT(internal_delete_skipped_count);
+ PERF_CONTEXT_OUTPUT(internal_recent_skipped_count);
+ PERF_CONTEXT_OUTPUT(internal_merge_count);
+ PERF_CONTEXT_OUTPUT(write_wal_time);
+ PERF_CONTEXT_OUTPUT(get_snapshot_time);
+ PERF_CONTEXT_OUTPUT(get_from_memtable_time);
+ PERF_CONTEXT_OUTPUT(get_from_memtable_count);
+ PERF_CONTEXT_OUTPUT(get_post_process_time);
+ PERF_CONTEXT_OUTPUT(get_from_output_files_time);
+ PERF_CONTEXT_OUTPUT(seek_on_memtable_time);
+ PERF_CONTEXT_OUTPUT(seek_on_memtable_count);
+ PERF_CONTEXT_OUTPUT(next_on_memtable_count);
+ PERF_CONTEXT_OUTPUT(prev_on_memtable_count);
+ PERF_CONTEXT_OUTPUT(seek_child_seek_time);
+ PERF_CONTEXT_OUTPUT(seek_child_seek_count);
+ PERF_CONTEXT_OUTPUT(seek_min_heap_time);
+ PERF_CONTEXT_OUTPUT(seek_internal_seek_time);
+ PERF_CONTEXT_OUTPUT(find_next_user_entry_time);
+ PERF_CONTEXT_OUTPUT(write_pre_and_post_process_time);
+ PERF_CONTEXT_OUTPUT(write_memtable_time);
+ PERF_CONTEXT_OUTPUT(write_thread_wait_nanos);
+ PERF_CONTEXT_OUTPUT(write_scheduling_flushes_compactions_time);
+ PERF_CONTEXT_OUTPUT(db_mutex_lock_nanos);
+ PERF_CONTEXT_OUTPUT(db_condition_wait_nanos);
+ PERF_CONTEXT_OUTPUT(merge_operator_time_nanos);
+ PERF_CONTEXT_OUTPUT(write_delay_time);
+ PERF_CONTEXT_OUTPUT(read_index_block_nanos);
+ PERF_CONTEXT_OUTPUT(read_filter_block_nanos);
+ PERF_CONTEXT_OUTPUT(new_table_block_iter_nanos);
+ PERF_CONTEXT_OUTPUT(new_table_iterator_nanos);
+ PERF_CONTEXT_OUTPUT(block_seek_nanos);
+ PERF_CONTEXT_OUTPUT(find_table_nanos);
+ PERF_CONTEXT_OUTPUT(bloom_memtable_hit_count);
+ PERF_CONTEXT_OUTPUT(bloom_memtable_miss_count);
+ PERF_CONTEXT_OUTPUT(bloom_sst_hit_count);
+ PERF_CONTEXT_OUTPUT(bloom_sst_miss_count);
+ PERF_CONTEXT_OUTPUT(key_lock_wait_time);
+ PERF_CONTEXT_OUTPUT(key_lock_wait_count);
+ PERF_CONTEXT_OUTPUT(env_new_sequential_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_new_random_access_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_new_writable_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_reuse_writable_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_new_random_rw_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_new_directory_nanos);
+ PERF_CONTEXT_OUTPUT(env_file_exists_nanos);
+ PERF_CONTEXT_OUTPUT(env_get_children_nanos);
+ PERF_CONTEXT_OUTPUT(env_get_children_file_attributes_nanos);
+ PERF_CONTEXT_OUTPUT(env_delete_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_create_dir_nanos);
+ PERF_CONTEXT_OUTPUT(env_create_dir_if_missing_nanos);
+ PERF_CONTEXT_OUTPUT(env_delete_dir_nanos);
+ PERF_CONTEXT_OUTPUT(env_get_file_size_nanos);
+ PERF_CONTEXT_OUTPUT(env_get_file_modification_time_nanos);
+ PERF_CONTEXT_OUTPUT(env_rename_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_link_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_lock_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_unlock_file_nanos);
+ PERF_CONTEXT_OUTPUT(env_new_logger_nanos);
+ PERF_CONTEXT_OUTPUT(get_cpu_nanos);
+ PERF_CONTEXT_OUTPUT(iter_next_cpu_nanos);
+ PERF_CONTEXT_OUTPUT(iter_prev_cpu_nanos);
+ PERF_CONTEXT_OUTPUT(iter_seek_cpu_nanos);
+ PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_useful);
+ PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_full_positive);
+ PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_full_true_positive);
+ PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(block_cache_hit_count);
+ PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(block_cache_miss_count);
+
+ std::string str = ss.str();
+ str.erase(str.find_last_not_of(", ") + 1);
+ return str;
+#endif
+}
+
+void PerfContext::EnablePerLevelPerfContext() {
+ if (level_to_perf_context == nullptr) {
+ level_to_perf_context = new std::map<uint32_t, PerfContextByLevel>();
+ }
+ per_level_perf_context_enabled = true;
+}
+
+void PerfContext::DisablePerLevelPerfContext(){
+ per_level_perf_context_enabled = false;
+}
+
+void PerfContext::ClearPerLevelPerfContext(){
+ if (level_to_perf_context != nullptr) {
+ level_to_perf_context->clear();
+ delete level_to_perf_context;
+ level_to_perf_context = nullptr;
+ }
+ per_level_perf_context_enabled = false;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/perf_context_imp.h b/src/rocksdb/monitoring/perf_context_imp.h
new file mode 100644
index 000000000..cdca27621
--- /dev/null
+++ b/src/rocksdb/monitoring/perf_context_imp.h
@@ -0,0 +1,97 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include "monitoring/perf_step_timer.h"
+#include "rocksdb/perf_context.h"
+#include "util/stop_watch.h"
+
+namespace ROCKSDB_NAMESPACE {
+#if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
+extern PerfContext perf_context;
+#else
+#if defined(OS_SOLARIS)
+extern __thread PerfContext perf_context_;
+#define perf_context (*get_perf_context())
+#else
+extern thread_local PerfContext perf_context;
+#endif
+#endif
+
+#if defined(NPERF_CONTEXT)
+
+#define PERF_TIMER_STOP(metric)
+#define PERF_TIMER_START(metric)
+#define PERF_TIMER_GUARD(metric)
+#define PERF_TIMER_GUARD_WITH_ENV(metric, env)
+#define PERF_CPU_TIMER_GUARD(metric, env)
+#define PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(metric, condition, stats, \
+ ticker_type)
+#define PERF_TIMER_MEASURE(metric)
+#define PERF_COUNTER_ADD(metric, value)
+#define PERF_COUNTER_BY_LEVEL_ADD(metric, value, level)
+
+#else
+
+// Stop the timer and update the metric
+#define PERF_TIMER_STOP(metric) perf_step_timer_##metric.Stop();
+
+#define PERF_TIMER_START(metric) perf_step_timer_##metric.Start();
+
+// Declare and set start time of the timer
+#define PERF_TIMER_GUARD(metric) \
+ PerfStepTimer perf_step_timer_##metric(&(perf_context.metric)); \
+ perf_step_timer_##metric.Start();
+
+// Declare and set start time of the timer
+#define PERF_TIMER_GUARD_WITH_ENV(metric, env) \
+ PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), env); \
+ perf_step_timer_##metric.Start();
+
+// Declare and set start time of the timer
+#define PERF_CPU_TIMER_GUARD(metric, env) \
+ PerfStepTimer perf_step_timer_##metric( \
+ &(perf_context.metric), env, true, \
+ PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
+ perf_step_timer_##metric.Start();
+
+#define PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(metric, condition, stats, \
+ ticker_type) \
+ PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), nullptr, \
+ false, PerfLevel::kEnableTime, stats, \
+ ticker_type); \
+ if (condition) { \
+ perf_step_timer_##metric.Start(); \
+ }
+
+// Update metric with time elapsed since last START. start time is reset
+// to current timestamp.
+#define PERF_TIMER_MEASURE(metric) perf_step_timer_##metric.Measure();
+
+// Increase metric value
+#define PERF_COUNTER_ADD(metric, value) \
+ if (perf_level >= PerfLevel::kEnableCount) { \
+ perf_context.metric += value; \
+ }
+
+// Increase metric value
+#define PERF_COUNTER_BY_LEVEL_ADD(metric, value, level) \
+ if (perf_level >= PerfLevel::kEnableCount && \
+ perf_context.per_level_perf_context_enabled && \
+ perf_context.level_to_perf_context) { \
+ if ((*(perf_context.level_to_perf_context)).find(level) != \
+ (*(perf_context.level_to_perf_context)).end()) { \
+ (*(perf_context.level_to_perf_context))[level].metric += value; \
+ } \
+ else { \
+ PerfContextByLevel empty_context; \
+ (*(perf_context.level_to_perf_context))[level] = empty_context; \
+ (*(perf_context.level_to_perf_context))[level].metric += value; \
+ } \
+ } \
+
+#endif
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/perf_level.cc b/src/rocksdb/monitoring/perf_level.cc
new file mode 100644
index 000000000..27bff0d28
--- /dev/null
+++ b/src/rocksdb/monitoring/perf_level.cc
@@ -0,0 +1,28 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#include <assert.h>
+#include "monitoring/perf_level_imp.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
+__thread PerfLevel perf_level = kEnableCount;
+#else
+PerfLevel perf_level = kEnableCount;
+#endif
+
+void SetPerfLevel(PerfLevel level) {
+ assert(level > kUninitialized);
+ assert(level < kOutOfBounds);
+ perf_level = level;
+}
+
+PerfLevel GetPerfLevel() {
+ return perf_level;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/perf_level_imp.h b/src/rocksdb/monitoring/perf_level_imp.h
new file mode 100644
index 000000000..01277af57
--- /dev/null
+++ b/src/rocksdb/monitoring/perf_level_imp.h
@@ -0,0 +1,18 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include "rocksdb/perf_level.h"
+#include "port/port.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
+extern __thread PerfLevel perf_level;
+#else
+extern PerfLevel perf_level;
+#endif
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/perf_step_timer.h b/src/rocksdb/monitoring/perf_step_timer.h
new file mode 100644
index 000000000..f2d35d9d6
--- /dev/null
+++ b/src/rocksdb/monitoring/perf_step_timer.h
@@ -0,0 +1,79 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include "monitoring/perf_level_imp.h"
+#include "rocksdb/env.h"
+#include "util/stop_watch.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class PerfStepTimer {
+ public:
+ explicit PerfStepTimer(
+ uint64_t* metric, Env* env = nullptr, bool use_cpu_time = false,
+ PerfLevel enable_level = PerfLevel::kEnableTimeExceptForMutex,
+ Statistics* statistics = nullptr, uint32_t ticker_type = 0)
+ : perf_counter_enabled_(perf_level >= enable_level),
+ use_cpu_time_(use_cpu_time),
+ env_((perf_counter_enabled_ || statistics != nullptr)
+ ? ((env != nullptr) ? env : Env::Default())
+ : nullptr),
+ start_(0),
+ metric_(metric),
+ statistics_(statistics),
+ ticker_type_(ticker_type) {}
+
+ ~PerfStepTimer() {
+ Stop();
+ }
+
+ void Start() {
+ if (perf_counter_enabled_ || statistics_ != nullptr) {
+ start_ = time_now();
+ }
+ }
+
+ uint64_t time_now() {
+ if (!use_cpu_time_) {
+ return env_->NowNanos();
+ } else {
+ return env_->NowCPUNanos();
+ }
+ }
+
+ void Measure() {
+ if (start_) {
+ uint64_t now = time_now();
+ *metric_ += now - start_;
+ start_ = now;
+ }
+ }
+
+ void Stop() {
+ if (start_) {
+ uint64_t duration = time_now() - start_;
+ if (perf_counter_enabled_) {
+ *metric_ += duration;
+ }
+
+ if (statistics_ != nullptr) {
+ RecordTick(statistics_, ticker_type_, duration);
+ }
+ start_ = 0;
+ }
+ }
+
+ private:
+ const bool perf_counter_enabled_;
+ const bool use_cpu_time_;
+ Env* const env_;
+ uint64_t start_;
+ uint64_t* metric_;
+ Statistics* statistics_;
+ uint32_t ticker_type_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/persistent_stats_history.cc b/src/rocksdb/monitoring/persistent_stats_history.cc
new file mode 100644
index 000000000..7cc869cf2
--- /dev/null
+++ b/src/rocksdb/monitoring/persistent_stats_history.cc
@@ -0,0 +1,170 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "monitoring/persistent_stats_history.h"
+
+#include <cstring>
+#include <string>
+#include <utility>
+#include "db/db_impl/db_impl.h"
+#include "port/likely.h"
+#include "util/string_util.h"
+
+namespace ROCKSDB_NAMESPACE {
+// 10 digit seconds timestamp => [Sep 9, 2001 ~ Nov 20, 2286]
+const int kNowSecondsStringLength = 10;
+const std::string kFormatVersionKeyString =
+ "__persistent_stats_format_version__";
+const std::string kCompatibleVersionKeyString =
+ "__persistent_stats_compatible_version__";
+// Every release maintains two versions numbers for persistents stats: Current
+// format version and compatible format version. Current format version
+// designates what type of encoding will be used when writing to stats CF;
+// compatible format version designates the minimum format version that
+// can decode the stats CF encoded using the current format version.
+const uint64_t kStatsCFCurrentFormatVersion = 1;
+const uint64_t kStatsCFCompatibleFormatVersion = 1;
+
+Status DecodePersistentStatsVersionNumber(DBImpl* db, StatsVersionKeyType type,
+ uint64_t* version_number) {
+ if (type >= StatsVersionKeyType::kKeyTypeMax) {
+ return Status::InvalidArgument("Invalid stats version key type provided");
+ }
+ std::string key;
+ if (type == StatsVersionKeyType::kFormatVersion) {
+ key = kFormatVersionKeyString;
+ } else if (type == StatsVersionKeyType::kCompatibleVersion) {
+ key = kCompatibleVersionKeyString;
+ }
+ ReadOptions options;
+ options.verify_checksums = true;
+ std::string result;
+ Status s = db->Get(options, db->PersistentStatsColumnFamily(), key, &result);
+ if (!s.ok() || result.empty()) {
+ return Status::NotFound("Persistent stats version key " + key +
+ " not found.");
+ }
+
+ // read version_number but do nothing in current version
+ *version_number = ParseUint64(result);
+ return Status::OK();
+}
+
+int EncodePersistentStatsKey(uint64_t now_seconds, const std::string& key,
+ int size, char* buf) {
+ char timestamp[kNowSecondsStringLength + 1];
+ // make time stamp string equal in length to allow sorting by time
+ snprintf(timestamp, sizeof(timestamp), "%010d",
+ static_cast<int>(now_seconds));
+ timestamp[kNowSecondsStringLength] = '\0';
+ return snprintf(buf, size, "%s#%s", timestamp, key.c_str());
+}
+
+void OptimizeForPersistentStats(ColumnFamilyOptions* cfo) {
+ cfo->write_buffer_size = 2 << 20;
+ cfo->target_file_size_base = 2 * 1048576;
+ cfo->max_bytes_for_level_base = 10 * 1048576;
+ cfo->soft_pending_compaction_bytes_limit = 256 * 1048576;
+ cfo->hard_pending_compaction_bytes_limit = 1073741824ul;
+ cfo->compression = kNoCompression;
+}
+
+PersistentStatsHistoryIterator::~PersistentStatsHistoryIterator() {}
+
+bool PersistentStatsHistoryIterator::Valid() const { return valid_; }
+
+Status PersistentStatsHistoryIterator::status() const { return status_; }
+
+void PersistentStatsHistoryIterator::Next() {
+ // increment start_time by 1 to avoid infinite loop
+ AdvanceIteratorByTime(GetStatsTime() + 1, end_time_);
+}
+
+uint64_t PersistentStatsHistoryIterator::GetStatsTime() const { return time_; }
+
+const std::map<std::string, uint64_t>&
+PersistentStatsHistoryIterator::GetStatsMap() const {
+ return stats_map_;
+}
+
+std::pair<uint64_t, std::string> parseKey(const Slice& key,
+ uint64_t start_time) {
+ std::pair<uint64_t, std::string> result;
+ std::string key_str = key.ToString();
+ std::string::size_type pos = key_str.find("#");
+ // TODO(Zhongyi): add counters to track parse failures?
+ if (pos == std::string::npos) {
+ result.first = port::kMaxUint64;
+ result.second.clear();
+ } else {
+ uint64_t parsed_time = ParseUint64(key_str.substr(0, pos));
+ // skip entries with timestamp smaller than start_time
+ if (parsed_time < start_time) {
+ result.first = port::kMaxUint64;
+ result.second = "";
+ } else {
+ result.first = parsed_time;
+ std::string key_resize = key_str.substr(pos + 1);
+ result.second = key_resize;
+ }
+ }
+ return result;
+}
+
+// advance the iterator to the next time between [start_time, end_time)
+// if success, update time_ and stats_map_ with new_time and stats_map
+void PersistentStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time,
+ uint64_t end_time) {
+ // try to find next entry in stats_history_ map
+ if (db_impl_ != nullptr) {
+ ReadOptions ro;
+ Iterator* iter =
+ db_impl_->NewIterator(ro, db_impl_->PersistentStatsColumnFamily());
+
+ char timestamp[kNowSecondsStringLength + 1];
+ snprintf(timestamp, sizeof(timestamp), "%010d",
+ static_cast<int>(std::max(time_, start_time)));
+ timestamp[kNowSecondsStringLength] = '\0';
+
+ iter->Seek(timestamp);
+ // no more entries with timestamp >= start_time is found or version key
+ // is found to be incompatible
+ if (!iter->Valid()) {
+ valid_ = false;
+ delete iter;
+ return;
+ }
+ time_ = parseKey(iter->key(), start_time).first;
+ valid_ = true;
+ // check parsed time and invalid if it exceeds end_time
+ if (time_ > end_time) {
+ valid_ = false;
+ delete iter;
+ return;
+ }
+ // find all entries with timestamp equal to time_
+ std::map<std::string, uint64_t> new_stats_map;
+ std::pair<uint64_t, std::string> kv;
+ for (; iter->Valid(); iter->Next()) {
+ kv = parseKey(iter->key(), start_time);
+ if (kv.first != time_) {
+ break;
+ }
+ if (kv.second.compare(kFormatVersionKeyString) == 0) {
+ continue;
+ }
+ new_stats_map[kv.second] = ParseUint64(iter->value().ToString());
+ }
+ stats_map_.swap(new_stats_map);
+ delete iter;
+ } else {
+ valid_ = false;
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/persistent_stats_history.h b/src/rocksdb/monitoring/persistent_stats_history.h
new file mode 100644
index 000000000..7c711fe4e
--- /dev/null
+++ b/src/rocksdb/monitoring/persistent_stats_history.h
@@ -0,0 +1,83 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/stats_history.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+extern const std::string kFormatVersionKeyString;
+extern const std::string kCompatibleVersionKeyString;
+extern const uint64_t kStatsCFCurrentFormatVersion;
+extern const uint64_t kStatsCFCompatibleFormatVersion;
+
+enum StatsVersionKeyType : uint32_t {
+ kFormatVersion = 1,
+ kCompatibleVersion = 2,
+ kKeyTypeMax = 3
+};
+
+// Read the version number from persitent stats cf depending on type provided
+// stores the version number in `*version_number`
+// returns Status::OK() on success, or other status code on failure
+Status DecodePersistentStatsVersionNumber(DBImpl* db, StatsVersionKeyType type,
+ uint64_t* version_number);
+
+// Encode timestamp and stats key into buf
+// Format: timestamp(10 digit) + '#' + key
+// Total length of encoded key will be capped at 100 bytes
+int EncodePersistentStatsKey(uint64_t timestamp, const std::string& key,
+ int size, char* buf);
+
+void OptimizeForPersistentStats(ColumnFamilyOptions* cfo);
+
+class PersistentStatsHistoryIterator final : public StatsHistoryIterator {
+ public:
+ PersistentStatsHistoryIterator(uint64_t start_time, uint64_t end_time,
+ DBImpl* db_impl)
+ : time_(0),
+ start_time_(start_time),
+ end_time_(end_time),
+ valid_(true),
+ db_impl_(db_impl) {
+ AdvanceIteratorByTime(start_time_, end_time_);
+ }
+ ~PersistentStatsHistoryIterator() override;
+ bool Valid() const override;
+ Status status() const override;
+
+ void Next() override;
+ uint64_t GetStatsTime() const override;
+
+ const std::map<std::string, uint64_t>& GetStatsMap() const override;
+
+ private:
+ // advance the iterator to the next stats history record with timestamp
+ // between [start_time, end_time)
+ void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time);
+
+ // No copying allowed
+ PersistentStatsHistoryIterator(const PersistentStatsHistoryIterator&) =
+ delete;
+ void operator=(const PersistentStatsHistoryIterator&) = delete;
+ PersistentStatsHistoryIterator(PersistentStatsHistoryIterator&&) = delete;
+ PersistentStatsHistoryIterator& operator=(PersistentStatsHistoryIterator&&) =
+ delete;
+
+ uint64_t time_;
+ uint64_t start_time_;
+ uint64_t end_time_;
+ std::map<std::string, uint64_t> stats_map_;
+ Status status_;
+ bool valid_;
+ DBImpl* db_impl_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/statistics.cc b/src/rocksdb/monitoring/statistics.cc
new file mode 100644
index 000000000..ff610c8a1
--- /dev/null
+++ b/src/rocksdb/monitoring/statistics.cc
@@ -0,0 +1,406 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#include "monitoring/statistics.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <cstdio>
+#include "port/likely.h"
+#include "rocksdb/statistics.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// The order of items listed in Tickers should be the same as
+// the order listed in TickersNameMap
+const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
+ {BLOCK_CACHE_MISS, "rocksdb.block.cache.miss"},
+ {BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"},
+ {BLOCK_CACHE_ADD, "rocksdb.block.cache.add"},
+ {BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"},
+ {BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"},
+ {BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"},
+ {BLOCK_CACHE_INDEX_ADD, "rocksdb.block.cache.index.add"},
+ {BLOCK_CACHE_INDEX_BYTES_INSERT, "rocksdb.block.cache.index.bytes.insert"},
+ {BLOCK_CACHE_INDEX_BYTES_EVICT, "rocksdb.block.cache.index.bytes.evict"},
+ {BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss"},
+ {BLOCK_CACHE_FILTER_HIT, "rocksdb.block.cache.filter.hit"},
+ {BLOCK_CACHE_FILTER_ADD, "rocksdb.block.cache.filter.add"},
+ {BLOCK_CACHE_FILTER_BYTES_INSERT,
+ "rocksdb.block.cache.filter.bytes.insert"},
+ {BLOCK_CACHE_FILTER_BYTES_EVICT, "rocksdb.block.cache.filter.bytes.evict"},
+ {BLOCK_CACHE_DATA_MISS, "rocksdb.block.cache.data.miss"},
+ {BLOCK_CACHE_DATA_HIT, "rocksdb.block.cache.data.hit"},
+ {BLOCK_CACHE_DATA_ADD, "rocksdb.block.cache.data.add"},
+ {BLOCK_CACHE_DATA_BYTES_INSERT, "rocksdb.block.cache.data.bytes.insert"},
+ {BLOCK_CACHE_BYTES_READ, "rocksdb.block.cache.bytes.read"},
+ {BLOCK_CACHE_BYTES_WRITE, "rocksdb.block.cache.bytes.write"},
+ {BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful"},
+ {BLOOM_FILTER_FULL_POSITIVE, "rocksdb.bloom.filter.full.positive"},
+ {BLOOM_FILTER_FULL_TRUE_POSITIVE,
+ "rocksdb.bloom.filter.full.true.positive"},
+ {BLOOM_FILTER_MICROS, "rocksdb.bloom.filter.micros"},
+ {PERSISTENT_CACHE_HIT, "rocksdb.persistent.cache.hit"},
+ {PERSISTENT_CACHE_MISS, "rocksdb.persistent.cache.miss"},
+ {SIM_BLOCK_CACHE_HIT, "rocksdb.sim.block.cache.hit"},
+ {SIM_BLOCK_CACHE_MISS, "rocksdb.sim.block.cache.miss"},
+ {MEMTABLE_HIT, "rocksdb.memtable.hit"},
+ {MEMTABLE_MISS, "rocksdb.memtable.miss"},
+ {GET_HIT_L0, "rocksdb.l0.hit"},
+ {GET_HIT_L1, "rocksdb.l1.hit"},
+ {GET_HIT_L2_AND_UP, "rocksdb.l2andup.hit"},
+ {COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new"},
+ {COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete"},
+ {COMPACTION_KEY_DROP_RANGE_DEL, "rocksdb.compaction.key.drop.range_del"},
+ {COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user"},
+ {COMPACTION_RANGE_DEL_DROP_OBSOLETE,
+ "rocksdb.compaction.range_del.drop.obsolete"},
+ {COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
+ "rocksdb.compaction.optimized.del.drop.obsolete"},
+ {COMPACTION_CANCELLED, "rocksdb.compaction.cancelled"},
+ {NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written"},
+ {NUMBER_KEYS_READ, "rocksdb.number.keys.read"},
+ {NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated"},
+ {BYTES_WRITTEN, "rocksdb.bytes.written"},
+ {BYTES_READ, "rocksdb.bytes.read"},
+ {NUMBER_DB_SEEK, "rocksdb.number.db.seek"},
+ {NUMBER_DB_NEXT, "rocksdb.number.db.next"},
+ {NUMBER_DB_PREV, "rocksdb.number.db.prev"},
+ {NUMBER_DB_SEEK_FOUND, "rocksdb.number.db.seek.found"},
+ {NUMBER_DB_NEXT_FOUND, "rocksdb.number.db.next.found"},
+ {NUMBER_DB_PREV_FOUND, "rocksdb.number.db.prev.found"},
+ {ITER_BYTES_READ, "rocksdb.db.iter.bytes.read"},
+ {NO_FILE_CLOSES, "rocksdb.no.file.closes"},
+ {NO_FILE_OPENS, "rocksdb.no.file.opens"},
+ {NO_FILE_ERRORS, "rocksdb.no.file.errors"},
+ {STALL_L0_SLOWDOWN_MICROS, "rocksdb.l0.slowdown.micros"},
+ {STALL_MEMTABLE_COMPACTION_MICROS, "rocksdb.memtable.compaction.micros"},
+ {STALL_L0_NUM_FILES_MICROS, "rocksdb.l0.num.files.stall.micros"},
+ {STALL_MICROS, "rocksdb.stall.micros"},
+ {DB_MUTEX_WAIT_MICROS, "rocksdb.db.mutex.wait.micros"},
+ {RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.delay.millis"},
+ {NO_ITERATORS, "rocksdb.num.iterators"},
+ {NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get"},
+ {NUMBER_MULTIGET_KEYS_READ, "rocksdb.number.multiget.keys.read"},
+ {NUMBER_MULTIGET_BYTES_READ, "rocksdb.number.multiget.bytes.read"},
+ {NUMBER_FILTERED_DELETES, "rocksdb.number.deletes.filtered"},
+ {NUMBER_MERGE_FAILURES, "rocksdb.number.merge.failures"},
+ {BLOOM_FILTER_PREFIX_CHECKED, "rocksdb.bloom.filter.prefix.checked"},
+ {BLOOM_FILTER_PREFIX_USEFUL, "rocksdb.bloom.filter.prefix.useful"},
+ {NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration"},
+ {GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls"},
+ {BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss"},
+ {BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit"},
+ {BLOCK_CACHE_COMPRESSED_ADD, "rocksdb.block.cachecompressed.add"},
+ {BLOCK_CACHE_COMPRESSED_ADD_FAILURES,
+ "rocksdb.block.cachecompressed.add.failures"},
+ {WAL_FILE_SYNCED, "rocksdb.wal.synced"},
+ {WAL_FILE_BYTES, "rocksdb.wal.bytes"},
+ {WRITE_DONE_BY_SELF, "rocksdb.write.self"},
+ {WRITE_DONE_BY_OTHER, "rocksdb.write.other"},
+ {WRITE_TIMEDOUT, "rocksdb.write.timeout"},
+ {WRITE_WITH_WAL, "rocksdb.write.wal"},
+ {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},
+ {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
+ {FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
+ {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
+ "rocksdb.number.direct.load.table.properties"},
+ {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"},
+ {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"},
+ {NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"},
+ {NUMBER_BLOCK_COMPRESSED, "rocksdb.number.block.compressed"},
+ {NUMBER_BLOCK_DECOMPRESSED, "rocksdb.number.block.decompressed"},
+ {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"},
+ {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"},
+ {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"},
+ {ROW_CACHE_HIT, "rocksdb.row.cache.hit"},
+ {ROW_CACHE_MISS, "rocksdb.row.cache.miss"},
+ {READ_AMP_ESTIMATE_USEFUL_BYTES, "rocksdb.read.amp.estimate.useful.bytes"},
+ {READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"},
+ {NUMBER_RATE_LIMITER_DRAINS, "rocksdb.number.rate_limiter.drains"},
+ {NUMBER_ITER_SKIP, "rocksdb.number.iter.skip"},
+ {BLOB_DB_NUM_PUT, "rocksdb.blobdb.num.put"},
+ {BLOB_DB_NUM_WRITE, "rocksdb.blobdb.num.write"},
+ {BLOB_DB_NUM_GET, "rocksdb.blobdb.num.get"},
+ {BLOB_DB_NUM_MULTIGET, "rocksdb.blobdb.num.multiget"},
+ {BLOB_DB_NUM_SEEK, "rocksdb.blobdb.num.seek"},
+ {BLOB_DB_NUM_NEXT, "rocksdb.blobdb.num.next"},
+ {BLOB_DB_NUM_PREV, "rocksdb.blobdb.num.prev"},
+ {BLOB_DB_NUM_KEYS_WRITTEN, "rocksdb.blobdb.num.keys.written"},
+ {BLOB_DB_NUM_KEYS_READ, "rocksdb.blobdb.num.keys.read"},
+ {BLOB_DB_BYTES_WRITTEN, "rocksdb.blobdb.bytes.written"},
+ {BLOB_DB_BYTES_READ, "rocksdb.blobdb.bytes.read"},
+ {BLOB_DB_WRITE_INLINED, "rocksdb.blobdb.write.inlined"},
+ {BLOB_DB_WRITE_INLINED_TTL, "rocksdb.blobdb.write.inlined.ttl"},
+ {BLOB_DB_WRITE_BLOB, "rocksdb.blobdb.write.blob"},
+ {BLOB_DB_WRITE_BLOB_TTL, "rocksdb.blobdb.write.blob.ttl"},
+ {BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "rocksdb.blobdb.blob.file.bytes.written"},
+ {BLOB_DB_BLOB_FILE_BYTES_READ, "rocksdb.blobdb.blob.file.bytes.read"},
+ {BLOB_DB_BLOB_FILE_SYNCED, "rocksdb.blobdb.blob.file.synced"},
+ {BLOB_DB_BLOB_INDEX_EXPIRED_COUNT,
+ "rocksdb.blobdb.blob.index.expired.count"},
+ {BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, "rocksdb.blobdb.blob.index.expired.size"},
+ {BLOB_DB_BLOB_INDEX_EVICTED_COUNT,
+ "rocksdb.blobdb.blob.index.evicted.count"},
+ {BLOB_DB_BLOB_INDEX_EVICTED_SIZE, "rocksdb.blobdb.blob.index.evicted.size"},
+ {BLOB_DB_GC_NUM_FILES, "rocksdb.blobdb.gc.num.files"},
+ {BLOB_DB_GC_NUM_NEW_FILES, "rocksdb.blobdb.gc.num.new.files"},
+ {BLOB_DB_GC_FAILURES, "rocksdb.blobdb.gc.failures"},
+ {BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, "rocksdb.blobdb.gc.num.keys.overwritten"},
+ {BLOB_DB_GC_NUM_KEYS_EXPIRED, "rocksdb.blobdb.gc.num.keys.expired"},
+ {BLOB_DB_GC_NUM_KEYS_RELOCATED, "rocksdb.blobdb.gc.num.keys.relocated"},
+ {BLOB_DB_GC_BYTES_OVERWRITTEN, "rocksdb.blobdb.gc.bytes.overwritten"},
+ {BLOB_DB_GC_BYTES_EXPIRED, "rocksdb.blobdb.gc.bytes.expired"},
+ {BLOB_DB_GC_BYTES_RELOCATED, "rocksdb.blobdb.gc.bytes.relocated"},
+ {BLOB_DB_FIFO_NUM_FILES_EVICTED, "rocksdb.blobdb.fifo.num.files.evicted"},
+ {BLOB_DB_FIFO_NUM_KEYS_EVICTED, "rocksdb.blobdb.fifo.num.keys.evicted"},
+ {BLOB_DB_FIFO_BYTES_EVICTED, "rocksdb.blobdb.fifo.bytes.evicted"},
+ {TXN_PREPARE_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.prepare"},
+ {TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD,
+ "rocksdb.txn.overhead.mutex.old.commit.map"},
+ {TXN_DUPLICATE_KEY_OVERHEAD, "rocksdb.txn.overhead.duplicate.key"},
+ {TXN_SNAPSHOT_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.snapshot"},
+ {TXN_GET_TRY_AGAIN, "rocksdb.txn.get.tryagain"},
+ {NUMBER_MULTIGET_KEYS_FOUND, "rocksdb.number.multiget.keys.found"},
+ {NO_ITERATOR_CREATED, "rocksdb.num.iterator.created"},
+ {NO_ITERATOR_DELETED, "rocksdb.num.iterator.deleted"},
+ {BLOCK_CACHE_COMPRESSION_DICT_MISS,
+ "rocksdb.block.cache.compression.dict.miss"},
+ {BLOCK_CACHE_COMPRESSION_DICT_HIT,
+ "rocksdb.block.cache.compression.dict.hit"},
+ {BLOCK_CACHE_COMPRESSION_DICT_ADD,
+ "rocksdb.block.cache.compression.dict.add"},
+ {BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
+ "rocksdb.block.cache.compression.dict.bytes.insert"},
+ {BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT,
+ "rocksdb.block.cache.compression.dict.bytes.evict"},
+};
+
+const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
+ {DB_GET, "rocksdb.db.get.micros"},
+ {DB_WRITE, "rocksdb.db.write.micros"},
+ {COMPACTION_TIME, "rocksdb.compaction.times.micros"},
+ {COMPACTION_CPU_TIME, "rocksdb.compaction.times.cpu_micros"},
+ {SUBCOMPACTION_SETUP_TIME, "rocksdb.subcompaction.setup.times.micros"},
+ {TABLE_SYNC_MICROS, "rocksdb.table.sync.micros"},
+ {COMPACTION_OUTFILE_SYNC_MICROS, "rocksdb.compaction.outfile.sync.micros"},
+ {WAL_FILE_SYNC_MICROS, "rocksdb.wal.file.sync.micros"},
+ {MANIFEST_FILE_SYNC_MICROS, "rocksdb.manifest.file.sync.micros"},
+ {TABLE_OPEN_IO_MICROS, "rocksdb.table.open.io.micros"},
+ {DB_MULTIGET, "rocksdb.db.multiget.micros"},
+ {READ_BLOCK_COMPACTION_MICROS, "rocksdb.read.block.compaction.micros"},
+ {READ_BLOCK_GET_MICROS, "rocksdb.read.block.get.micros"},
+ {WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros"},
+ {STALL_L0_SLOWDOWN_COUNT, "rocksdb.l0.slowdown.count"},
+ {STALL_MEMTABLE_COMPACTION_COUNT, "rocksdb.memtable.compaction.count"},
+ {STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"},
+ {HARD_RATE_LIMIT_DELAY_COUNT, "rocksdb.hard.rate.limit.delay.count"},
+ {SOFT_RATE_LIMIT_DELAY_COUNT, "rocksdb.soft.rate.limit.delay.count"},
+ {NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction"},
+ {DB_SEEK, "rocksdb.db.seek.micros"},
+ {WRITE_STALL, "rocksdb.db.write.stall"},
+ {SST_READ_MICROS, "rocksdb.sst.read.micros"},
+ {NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"},
+ {BYTES_PER_READ, "rocksdb.bytes.per.read"},
+ {BYTES_PER_WRITE, "rocksdb.bytes.per.write"},
+ {BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"},
+ {BYTES_COMPRESSED, "rocksdb.bytes.compressed"},
+ {BYTES_DECOMPRESSED, "rocksdb.bytes.decompressed"},
+ {COMPRESSION_TIMES_NANOS, "rocksdb.compression.times.nanos"},
+ {DECOMPRESSION_TIMES_NANOS, "rocksdb.decompression.times.nanos"},
+ {READ_NUM_MERGE_OPERANDS, "rocksdb.read.num.merge_operands"},
+ {BLOB_DB_KEY_SIZE, "rocksdb.blobdb.key.size"},
+ {BLOB_DB_VALUE_SIZE, "rocksdb.blobdb.value.size"},
+ {BLOB_DB_WRITE_MICROS, "rocksdb.blobdb.write.micros"},
+ {BLOB_DB_GET_MICROS, "rocksdb.blobdb.get.micros"},
+ {BLOB_DB_MULTIGET_MICROS, "rocksdb.blobdb.multiget.micros"},
+ {BLOB_DB_SEEK_MICROS, "rocksdb.blobdb.seek.micros"},
+ {BLOB_DB_NEXT_MICROS, "rocksdb.blobdb.next.micros"},
+ {BLOB_DB_PREV_MICROS, "rocksdb.blobdb.prev.micros"},
+ {BLOB_DB_BLOB_FILE_WRITE_MICROS, "rocksdb.blobdb.blob.file.write.micros"},
+ {BLOB_DB_BLOB_FILE_READ_MICROS, "rocksdb.blobdb.blob.file.read.micros"},
+ {BLOB_DB_BLOB_FILE_SYNC_MICROS, "rocksdb.blobdb.blob.file.sync.micros"},
+ {BLOB_DB_GC_MICROS, "rocksdb.blobdb.gc.micros"},
+ {BLOB_DB_COMPRESSION_MICROS, "rocksdb.blobdb.compression.micros"},
+ {BLOB_DB_DECOMPRESSION_MICROS, "rocksdb.blobdb.decompression.micros"},
+ {FLUSH_TIME, "rocksdb.db.flush.micros"},
+ {SST_BATCH_SIZE, "rocksdb.sst.batch.size"},
+};
+
+std::shared_ptr<Statistics> CreateDBStatistics() {
+ return std::make_shared<StatisticsImpl>(nullptr);
+}
+
+StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats)
+ : stats_(std::move(stats)) {}
+
+StatisticsImpl::~StatisticsImpl() {}
+
+uint64_t StatisticsImpl::getTickerCount(uint32_t tickerType) const {
+ MutexLock lock(&aggregate_lock_);
+ return getTickerCountLocked(tickerType);
+}
+
+uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const {
+ assert(tickerType < TICKER_ENUM_MAX);
+ uint64_t res = 0;
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType];
+ }
+ return res;
+}
+
+void StatisticsImpl::histogramData(uint32_t histogramType,
+ HistogramData* const data) const {
+ MutexLock lock(&aggregate_lock_);
+ getHistogramImplLocked(histogramType)->Data(data);
+}
+
+std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked(
+ uint32_t histogramType) const {
+ assert(histogramType < HISTOGRAM_ENUM_MAX);
+ std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl());
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ res_hist->Merge(
+ per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]);
+ }
+ return res_hist;
+}
+
+std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const {
+ MutexLock lock(&aggregate_lock_);
+ return getHistogramImplLocked(histogramType)->ToString();
+}
+
+void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) {
+ {
+ MutexLock lock(&aggregate_lock_);
+ setTickerCountLocked(tickerType, count);
+ }
+ if (stats_ && tickerType < TICKER_ENUM_MAX) {
+ stats_->setTickerCount(tickerType, count);
+ }
+}
+
+void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) {
+ assert(tickerType < TICKER_ENUM_MAX);
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ if (core_idx == 0) {
+ per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count;
+ } else {
+ per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0;
+ }
+ }
+}
+
+uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) {
+ uint64_t sum = 0;
+ {
+ MutexLock lock(&aggregate_lock_);
+ assert(tickerType < TICKER_ENUM_MAX);
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ sum +=
+ per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange(
+ 0, std::memory_order_relaxed);
+ }
+ }
+ if (stats_ && tickerType < TICKER_ENUM_MAX) {
+ stats_->setTickerCount(tickerType, 0);
+ }
+ return sum;
+}
+
+void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) {
+ assert(tickerType < TICKER_ENUM_MAX);
+ per_core_stats_.Access()->tickers_[tickerType].fetch_add(
+ count, std::memory_order_relaxed);
+ if (stats_ && tickerType < TICKER_ENUM_MAX) {
+ stats_->recordTick(tickerType, count);
+ }
+}
+
+void StatisticsImpl::recordInHistogram(uint32_t histogramType, uint64_t value) {
+ assert(histogramType < HISTOGRAM_ENUM_MAX);
+ if (get_stats_level() <= StatsLevel::kExceptHistogramOrTimers) {
+ return;
+ }
+ per_core_stats_.Access()->histograms_[histogramType].Add(value);
+ if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) {
+ stats_->recordInHistogram(histogramType, value);
+ }
+}
+
+Status StatisticsImpl::Reset() {
+ MutexLock lock(&aggregate_lock_);
+ for (uint32_t i = 0; i < TICKER_ENUM_MAX; ++i) {
+ setTickerCountLocked(i, 0);
+ }
+ for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) {
+ for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) {
+ per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear();
+ }
+ }
+ return Status::OK();
+}
+
+namespace {
+
+// a buffer size used for temp string buffers
+const int kTmpStrBufferSize = 200;
+
+} // namespace
+
+std::string StatisticsImpl::ToString() const {
+ MutexLock lock(&aggregate_lock_);
+ std::string res;
+ res.reserve(20000);
+ for (const auto& t : TickersNameMap) {
+ assert(t.first < TICKER_ENUM_MAX);
+ char buffer[kTmpStrBufferSize];
+ snprintf(buffer, kTmpStrBufferSize, "%s COUNT : %" PRIu64 "\n",
+ t.second.c_str(), getTickerCountLocked(t.first));
+ res.append(buffer);
+ }
+ for (const auto& h : HistogramsNameMap) {
+ assert(h.first < HISTOGRAM_ENUM_MAX);
+ char buffer[kTmpStrBufferSize];
+ HistogramData hData;
+ getHistogramImplLocked(h.first)->Data(&hData);
+ // don't handle failures - buffer should always be big enough and arguments
+ // should be provided correctly
+ int ret =
+ snprintf(buffer, kTmpStrBufferSize,
+ "%s P50 : %f P95 : %f P99 : %f P100 : %f COUNT : %" PRIu64
+ " SUM : %" PRIu64 "\n",
+ h.second.c_str(), hData.median, hData.percentile95,
+ hData.percentile99, hData.max, hData.count, hData.sum);
+ if (ret < 0 || ret >= kTmpStrBufferSize) {
+ assert(false);
+ continue;
+ }
+ res.append(buffer);
+ }
+ res.shrink_to_fit();
+ return res;
+}
+
+bool StatisticsImpl::getTickerMap(
+ std::map<std::string, uint64_t>* stats_map) const {
+ assert(stats_map);
+ if (!stats_map) return false;
+ stats_map->clear();
+ MutexLock lock(&aggregate_lock_);
+ for (const auto& t : TickersNameMap) {
+ assert(t.first < TICKER_ENUM_MAX);
+ (*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first);
+ }
+ return true;
+}
+
+bool StatisticsImpl::HistEnabledForType(uint32_t type) const {
+ return type < HISTOGRAM_ENUM_MAX;
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/statistics.h b/src/rocksdb/monitoring/statistics.h
new file mode 100644
index 000000000..f633aa4ef
--- /dev/null
+++ b/src/rocksdb/monitoring/statistics.h
@@ -0,0 +1,138 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+#include "rocksdb/statistics.h"
+
+#include <atomic>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "monitoring/histogram.h"
+#include "port/likely.h"
+#include "port/port.h"
+#include "util/core_local.h"
+#include "util/mutexlock.h"
+
+#ifdef __clang__
+#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
+#else
+#define ROCKSDB_FIELD_UNUSED
+#endif // __clang__
+
+#ifndef STRINGIFY
+#define STRINGIFY(x) #x
+#define TOSTRING(x) STRINGIFY(x)
+#endif
+
+namespace ROCKSDB_NAMESPACE {
+
+enum TickersInternal : uint32_t {
+ INTERNAL_TICKER_ENUM_START = TICKER_ENUM_MAX,
+ INTERNAL_TICKER_ENUM_MAX
+};
+
+enum HistogramsInternal : uint32_t {
+ INTERNAL_HISTOGRAM_START = HISTOGRAM_ENUM_MAX,
+ INTERNAL_HISTOGRAM_ENUM_MAX
+};
+
+class StatisticsImpl : public Statistics {
+ public:
+ StatisticsImpl(std::shared_ptr<Statistics> stats);
+ virtual ~StatisticsImpl();
+
+ virtual uint64_t getTickerCount(uint32_t ticker_type) const override;
+ virtual void histogramData(uint32_t histogram_type,
+ HistogramData* const data) const override;
+ std::string getHistogramString(uint32_t histogram_type) const override;
+
+ virtual void setTickerCount(uint32_t ticker_type, uint64_t count) override;
+ virtual uint64_t getAndResetTickerCount(uint32_t ticker_type) override;
+ virtual void recordTick(uint32_t ticker_type, uint64_t count) override;
+ // The function is implemented for now for backward compatibility reason.
+ // In case a user explictly calls it, for example, they may have a wrapped
+ // Statistics object, passing the call to recordTick() into here, nothing
+ // will break.
+ void measureTime(uint32_t histogramType, uint64_t time) override {
+ recordInHistogram(histogramType, time);
+ }
+ virtual void recordInHistogram(uint32_t histogram_type,
+ uint64_t value) override;
+
+ virtual Status Reset() override;
+ virtual std::string ToString() const override;
+ virtual bool getTickerMap(std::map<std::string, uint64_t>*) const override;
+ virtual bool HistEnabledForType(uint32_t type) const override;
+
+ private:
+ // If non-nullptr, forwards updates to the object pointed to by `stats_`.
+ std::shared_ptr<Statistics> stats_;
+ // Synchronizes anything that operates across other cores' local data,
+ // such that operations like Reset() can be performed atomically.
+ mutable port::Mutex aggregate_lock_;
+
+ // The ticker/histogram data are stored in this structure, which we will store
+ // per-core. It is cache-aligned, so tickers/histograms belonging to different
+ // cores can never share the same cache line.
+ //
+ // Alignment attributes expand to nothing depending on the platform
+ struct ALIGN_AS(CACHE_LINE_SIZE) StatisticsData {
+ std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX] = {{0}};
+ HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];
+#ifndef HAVE_ALIGNED_NEW
+ char
+ padding[(CACHE_LINE_SIZE -
+ (INTERNAL_TICKER_ENUM_MAX * sizeof(std::atomic_uint_fast64_t) +
+ INTERNAL_HISTOGRAM_ENUM_MAX * sizeof(HistogramImpl)) %
+ CACHE_LINE_SIZE)] ROCKSDB_FIELD_UNUSED;
+#endif
+ void *operator new(size_t s) { return port::cacheline_aligned_alloc(s); }
+ void *operator new[](size_t s) { return port::cacheline_aligned_alloc(s); }
+ void operator delete(void *p) { port::cacheline_aligned_free(p); }
+ void operator delete[](void *p) { port::cacheline_aligned_free(p); }
+ };
+
+ static_assert(sizeof(StatisticsData) % CACHE_LINE_SIZE == 0, "Expected " TOSTRING(CACHE_LINE_SIZE) "-byte aligned");
+
+ CoreLocalArray<StatisticsData> per_core_stats_;
+
+ uint64_t getTickerCountLocked(uint32_t ticker_type) const;
+ std::unique_ptr<HistogramImpl> getHistogramImplLocked(
+ uint32_t histogram_type) const;
+ void setTickerCountLocked(uint32_t ticker_type, uint64_t count);
+};
+
+// Utility functions
+inline void RecordInHistogram(Statistics* statistics, uint32_t histogram_type,
+ uint64_t value) {
+ if (statistics) {
+ statistics->recordInHistogram(histogram_type, value);
+ }
+}
+
+inline void RecordTimeToHistogram(Statistics* statistics,
+ uint32_t histogram_type, uint64_t value) {
+ if (statistics) {
+ statistics->reportTimeToHistogram(histogram_type, value);
+ }
+}
+
+inline void RecordTick(Statistics* statistics, uint32_t ticker_type,
+ uint64_t count = 1) {
+ if (statistics) {
+ statistics->recordTick(ticker_type, count);
+ }
+}
+
+inline void SetTickerCount(Statistics* statistics, uint32_t ticker_type,
+ uint64_t count) {
+ if (statistics) {
+ statistics->setTickerCount(ticker_type, count);
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/statistics_test.cc b/src/rocksdb/monitoring/statistics_test.cc
new file mode 100644
index 000000000..e497afcbe
--- /dev/null
+++ b/src/rocksdb/monitoring/statistics_test.cc
@@ -0,0 +1,47 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#include "port/stack_trace.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+
+#include "rocksdb/statistics.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class StatisticsTest : public testing::Test {};
+
+// Sanity check to make sure that contents and order of TickersNameMap
+// match Tickers enum
+TEST_F(StatisticsTest, SanityTickers) {
+ EXPECT_EQ(static_cast<size_t>(Tickers::TICKER_ENUM_MAX),
+ TickersNameMap.size());
+
+ for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) {
+ auto pair = TickersNameMap[static_cast<size_t>(t)];
+ ASSERT_EQ(pair.first, t) << "Miss match at " << pair.second;
+ }
+}
+
+// Sanity check to make sure that contents and order of HistogramsNameMap
+// match Tickers enum
+TEST_F(StatisticsTest, SanityHistograms) {
+ EXPECT_EQ(static_cast<size_t>(Histograms::HISTOGRAM_ENUM_MAX),
+ HistogramsNameMap.size());
+
+ for (uint32_t h = 0; h < Histograms::HISTOGRAM_ENUM_MAX; h++) {
+ auto pair = HistogramsNameMap[static_cast<size_t>(h)];
+ ASSERT_EQ(pair.first, h) << "Miss match at " << pair.second;
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/monitoring/stats_history_test.cc b/src/rocksdb/monitoring/stats_history_test.cc
new file mode 100644
index 000000000..56b8dc4f5
--- /dev/null
+++ b/src/rocksdb/monitoring/stats_history_test.cc
@@ -0,0 +1,653 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include <limits>
+#include <string>
+#include <unordered_map>
+
+#include "db/column_family.h"
+#include "db/db_impl/db_impl.h"
+#include "db/db_test_util.h"
+#include "monitoring/persistent_stats_history.h"
+#include "options/options_helper.h"
+#include "port/stack_trace.h"
+#include "rocksdb/cache.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/rate_limiter.h"
+#include "rocksdb/stats_history.h"
+#include "test_util/sync_point.h"
+#include "test_util/testutil.h"
+#include "util/random.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class StatsHistoryTest : public DBTestBase {
+ public:
+ StatsHistoryTest() : DBTestBase("/stats_history_test") {}
+};
+#ifndef ROCKSDB_LITE
+
+TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) {
+ Options options;
+ options.create_if_missing = true;
+ options.stats_dump_period_sec = 5;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+ int counter = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+#if defined(OS_MACOSX) && !defined(NDEBUG)
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
+ uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
+ if (time_us < mock_env->RealNowMicros()) {
+ *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
+ }
+ });
+#endif // OS_MACOSX && !NDEBUG
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Reopen(options);
+ ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
+ dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); });
+ ASSERT_GE(counter, 1);
+
+ // Test cacel job through SetOptions
+ ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}}));
+ int old_val = counter;
+ for (int i = 6; i < 20; ++i) {
+ dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); });
+ }
+ ASSERT_EQ(counter, old_val);
+ Close();
+}
+
+// Test persistent stats background thread scheduling and cancelling
+TEST_F(StatsHistoryTest, StatsPersistScheduling) {
+ Options options;
+ options.create_if_missing = true;
+ options.stats_persist_period_sec = 5;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+#if defined(OS_MACOSX) && !defined(NDEBUG)
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
+ uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
+ if (time_us < mock_env->RealNowMicros()) {
+ *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
+ }
+ });
+#endif // OS_MACOSX && !NDEBUG
+ int counter = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Reopen(options);
+ ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec);
+ dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
+ ASSERT_GE(counter, 1);
+
+ // Test cacel job through SetOptions
+ ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled());
+ ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}}));
+ ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled());
+ Close();
+}
+
+// Test enabling persistent stats for the first time
+TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) {
+ Options options;
+ options.create_if_missing = true;
+ options.stats_persist_period_sec = 0;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+#if defined(OS_MACOSX) && !defined(NDEBUG)
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
+ uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
+ if (time_us < mock_env->RealNowMicros()) {
+ *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
+ }
+ });
+#endif // OS_MACOSX && !NDEBUG
+ int counter = 0;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Reopen(options);
+ ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}}));
+ ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec);
+ dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
+ ASSERT_GE(counter, 1);
+ Close();
+}
+
+// TODO(Zhongyi): Move persistent stats related tests to a separate file
+TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) {
+ Options options;
+ options.create_if_missing = true;
+ options.stats_persist_period_sec = 5;
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+#if defined(OS_MACOSX) && !defined(NDEBUG)
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
+ uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
+ if (time_us < mock_env->RealNowMicros()) {
+ *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+#endif // OS_MACOSX && !NDEBUG
+
+ CreateColumnFamilies({"pikachu"}, options);
+ ASSERT_OK(Put("foo", "bar"));
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+
+ int mock_time = 1;
+ // Wait for stats persist to finish
+ dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
+ std::unique_ptr<StatsHistoryIterator> stats_iter;
+ db_->GetStatsHistory(0 /*start_time*/, 6 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ // disabled stats snapshots
+ ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}}));
+ size_t stats_count = 0;
+ for (; stats_iter->Valid(); stats_iter->Next()) {
+ auto stats_map = stats_iter->GetStatsMap();
+ ASSERT_EQ(stats_iter->GetStatsTime(), 5);
+ stats_count += stats_map.size();
+ }
+ ASSERT_GT(stats_count, 0);
+ // Wait a bit and verify no more stats are found
+ for (mock_time = 6; mock_time < 20; ++mock_time) {
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(mock_time); });
+ }
+ db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ size_t stats_count_new = 0;
+ for (; stats_iter->Valid(); stats_iter->Next()) {
+ stats_count_new += stats_iter->GetStatsMap().size();
+ }
+ ASSERT_EQ(stats_count_new, stats_count);
+ Close();
+}
+
+TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) {
+ Options options;
+ options.create_if_missing = true;
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ options.stats_persist_period_sec = 1;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+#if defined(OS_MACOSX) && !defined(NDEBUG)
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
+ uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
+ if (time_us < mock_env->RealNowMicros()) {
+ *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+#endif // OS_MACOSX && !NDEBUG
+
+ CreateColumnFamilies({"pikachu"}, options);
+ ASSERT_OK(Put("foo", "bar"));
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ // some random operation to populate statistics
+ ASSERT_OK(Delete("foo"));
+ ASSERT_OK(Put("sol", "sol"));
+ ASSERT_OK(Put("epic", "epic"));
+ ASSERT_OK(Put("ltd", "ltd"));
+ ASSERT_EQ("sol", Get("sol"));
+ ASSERT_EQ("epic", Get("epic"));
+ ASSERT_EQ("ltd", Get("ltd"));
+ Iterator* iterator = db_->NewIterator(ReadOptions());
+ for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
+ ASSERT_TRUE(iterator->key() == iterator->value());
+ }
+ delete iterator;
+ ASSERT_OK(Flush());
+ ASSERT_OK(Delete("sol"));
+ db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ int mock_time = 1;
+ // Wait for stats persist to finish
+ for (; mock_time < 5; ++mock_time) {
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(mock_time); });
+ }
+
+ // second round of ops
+ ASSERT_OK(Put("saigon", "saigon"));
+ ASSERT_OK(Put("noodle talk", "noodle talk"));
+ ASSERT_OK(Put("ping bistro", "ping bistro"));
+ iterator = db_->NewIterator(ReadOptions());
+ for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
+ ASSERT_TRUE(iterator->key() == iterator->value());
+ }
+ delete iterator;
+ ASSERT_OK(Flush());
+ db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ for (; mock_time < 10; ++mock_time) {
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(mock_time); });
+ }
+ std::unique_ptr<StatsHistoryIterator> stats_iter;
+ db_->GetStatsHistory(0 /*start_time*/, 10 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ size_t stats_count = 0;
+ int slice_count = 0;
+ for (; stats_iter->Valid(); stats_iter->Next()) {
+ slice_count++;
+ auto stats_map = stats_iter->GetStatsMap();
+ stats_count += stats_map.size();
+ }
+ size_t stats_history_size = dbfull()->TEST_EstimateInMemoryStatsHistorySize();
+ ASSERT_GE(slice_count, 9);
+ ASSERT_GE(stats_history_size, 12000);
+ // capping memory cost at 12000 bytes since one slice is around 10000~12000
+ ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "12000"}}));
+ ASSERT_EQ(12000, dbfull()->GetDBOptions().stats_history_buffer_size);
+ // Wait for stats persist to finish
+ for (; mock_time < 20; ++mock_time) {
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(mock_time); });
+ }
+ db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ size_t stats_count_reopen = 0;
+ slice_count = 0;
+ for (; stats_iter->Valid(); stats_iter->Next()) {
+ slice_count++;
+ auto stats_map = stats_iter->GetStatsMap();
+ stats_count_reopen += stats_map.size();
+ }
+ size_t stats_history_size_reopen =
+ dbfull()->TEST_EstimateInMemoryStatsHistorySize();
+ // only one slice can fit under the new stats_history_buffer_size
+ ASSERT_LT(slice_count, 2);
+ ASSERT_TRUE(stats_history_size_reopen < 12000 &&
+ stats_history_size_reopen > 0);
+ ASSERT_TRUE(stats_count_reopen < stats_count && stats_count_reopen > 0);
+ Close();
+ // TODO: may also want to verify stats timestamp to make sure we are purging
+ // the correct stats snapshot
+}
+
+int countkeys(Iterator* iter) {
+ int count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ count++;
+ }
+ return count;
+}
+
+TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) {
+ Options options;
+ options.create_if_missing = true;
+ options.stats_persist_period_sec = 5;
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ options.persist_stats_to_disk = true;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+ CreateColumnFamilies({"pikachu"}, options);
+ ASSERT_OK(Put("foo", "bar"));
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ ASSERT_EQ(Get("foo"), "bar");
+
+ // Wait for stats persist to finish
+ dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
+ auto iter =
+ db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily());
+ int key_count1 = countkeys(iter);
+ delete iter;
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(10); });
+ iter =
+ db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily());
+ int key_count2 = countkeys(iter);
+ delete iter;
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(15); });
+ iter =
+ db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily());
+ int key_count3 = countkeys(iter);
+ delete iter;
+ ASSERT_GE(key_count2, key_count1);
+ ASSERT_GE(key_count3, key_count2);
+ ASSERT_EQ(key_count3 - key_count2, key_count2 - key_count1);
+ std::unique_ptr<StatsHistoryIterator> stats_iter;
+ db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ size_t stats_count = 0;
+ int slice_count = 0;
+ int non_zero_count = 0;
+ for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) {
+ slice_count++;
+ auto stats_map = stats_iter->GetStatsMap();
+ ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i);
+ for (auto& stat : stats_map) {
+ if (stat.second != 0) {
+ non_zero_count++;
+ }
+ }
+ stats_count += stats_map.size();
+ }
+ ASSERT_EQ(slice_count, 3);
+ // 2 extra keys for format version
+ ASSERT_EQ(stats_count, key_count3 - 2);
+ // verify reopen will not cause data loss
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ size_t stats_count_reopen = 0;
+ int slice_count_reopen = 0;
+ int non_zero_count_recover = 0;
+ for (; stats_iter->Valid(); stats_iter->Next()) {
+ slice_count_reopen++;
+ auto stats_map = stats_iter->GetStatsMap();
+ for (auto& stat : stats_map) {
+ if (stat.second != 0) {
+ non_zero_count_recover++;
+ }
+ }
+ stats_count_reopen += stats_map.size();
+ }
+ ASSERT_EQ(non_zero_count, non_zero_count_recover);
+ ASSERT_EQ(slice_count, slice_count_reopen);
+ ASSERT_EQ(stats_count, stats_count_reopen);
+ Close();
+}
+
+// Test persisted stats matches the value found in options.statistics and
+// the stats value retains after DB reopen
+TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) {
+ Options options;
+ options.create_if_missing = true;
+ options.stats_persist_period_sec = 5;
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ options.persist_stats_to_disk = true;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ std::map<std::string, uint64_t> stats_map_before;
+ ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_before));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+ CreateColumnFamilies({"pikachu"}, options);
+ ASSERT_OK(Put("foo", "bar"));
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ ASSERT_EQ(Get("foo"), "bar");
+
+ // Wait for stats persist to finish
+ dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
+ auto iter =
+ db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily());
+ countkeys(iter);
+ delete iter;
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(10); });
+ iter =
+ db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily());
+ countkeys(iter);
+ delete iter;
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(15); });
+ iter =
+ db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily());
+ countkeys(iter);
+ delete iter;
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(20); });
+
+ std::map<std::string, uint64_t> stats_map_after;
+ ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_after));
+ std::unique_ptr<StatsHistoryIterator> stats_iter;
+ db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ std::string sample = "rocksdb.num.iterator.deleted";
+ uint64_t recovered_value = 0;
+ for (int i = 1; stats_iter->Valid(); stats_iter->Next(), ++i) {
+ auto stats_map = stats_iter->GetStatsMap();
+ ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i);
+ for (const auto& stat : stats_map) {
+ if (sample.compare(stat.first) == 0) {
+ recovered_value += stat.second;
+ }
+ }
+ }
+ ASSERT_EQ(recovered_value, stats_map_after[sample]);
+
+ // test stats value retains after recovery
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ uint64_t new_recovered_value = 0;
+ for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) {
+ auto stats_map = stats_iter->GetStatsMap();
+ ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i);
+ for (const auto& stat : stats_map) {
+ if (sample.compare(stat.first) == 0) {
+ new_recovered_value += stat.second;
+ }
+ }
+ }
+ ASSERT_EQ(recovered_value, new_recovered_value);
+
+ // TODO(Zhongyi): also add test to read raw values from disk and verify
+ // correctness
+ Close();
+}
+
+// TODO(Zhongyi): add test for different format versions
+
+TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) {
+ Options options;
+ options.create_if_missing = true;
+ options.stats_persist_period_sec = 5;
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ options.persist_stats_to_disk = true;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+ ASSERT_OK(TryReopen(options));
+ CreateColumnFamilies({"one", "two", "three"}, options);
+ ASSERT_OK(Put(1, "foo", "bar"));
+ ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
+ ASSERT_EQ(Get(2, "foo"), "bar");
+ CreateColumnFamilies({"four"}, options);
+ ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options);
+ ASSERT_EQ(Get(2, "foo"), "bar");
+ dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
+ auto iter =
+ db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily());
+ int key_count = countkeys(iter);
+ delete iter;
+ ASSERT_GE(key_count, 0);
+ uint64_t num_write_wal = 0;
+ std::string sample = "rocksdb.write.wal";
+ std::unique_ptr<StatsHistoryIterator> stats_iter;
+ db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ for (; stats_iter->Valid(); stats_iter->Next()) {
+ auto stats_map = stats_iter->GetStatsMap();
+ for (const auto& stat : stats_map) {
+ if (sample.compare(stat.first) == 0) {
+ num_write_wal += stat.second;
+ }
+ }
+ }
+ stats_iter.reset();
+ ASSERT_EQ(num_write_wal, 2);
+
+ options.persist_stats_to_disk = false;
+ ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options);
+ int cf_count = 0;
+ for (auto cfd : *dbfull()->versions_->GetColumnFamilySet()) {
+ (void)cfd;
+ cf_count++;
+ }
+ // persistent stats cf will be implicitly opened even if
+ // persist_stats_to_disk is false
+ ASSERT_EQ(cf_count, 6);
+ ASSERT_EQ(Get(2, "foo"), "bar");
+
+ // attempt to create column family using same name, should fail
+ ColumnFamilyOptions cf_opts(options);
+ ColumnFamilyHandle* handle;
+ ASSERT_NOK(db_->CreateColumnFamily(cf_opts, kPersistentStatsColumnFamilyName,
+ &handle));
+
+ options.persist_stats_to_disk = true;
+ ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options);
+ ASSERT_NOK(db_->CreateColumnFamily(cf_opts, kPersistentStatsColumnFamilyName,
+ &handle));
+ // verify stats is not affected by prior failed CF creation
+ db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter);
+ ASSERT_TRUE(stats_iter != nullptr);
+ num_write_wal = 0;
+ for (; stats_iter->Valid(); stats_iter->Next()) {
+ auto stats_map = stats_iter->GetStatsMap();
+ for (const auto& stat : stats_map) {
+ if (sample.compare(stat.first) == 0) {
+ num_write_wal += stat.second;
+ }
+ }
+ }
+ ASSERT_EQ(num_write_wal, 2);
+
+ Close();
+ Destroy(options);
+}
+
+TEST_F(StatsHistoryTest, PersistentStatsReadOnly) {
+ ASSERT_OK(Put("bar", "v2"));
+ Close();
+
+ auto options = CurrentOptions();
+ options.stats_persist_period_sec = 5;
+ options.persist_stats_to_disk = true;
+ assert(options.env == env_);
+ ASSERT_OK(ReadOnlyReopen(options));
+ ASSERT_EQ("v2", Get("bar"));
+ Close();
+
+ // Reopen and flush memtable.
+ ASSERT_OK(TryReopen(options));
+ Flush();
+ Close();
+ // Now check keys in read only mode.
+ ASSERT_OK(ReadOnlyReopen(options));
+}
+
+TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) {
+ Options options;
+ options.create_if_missing = true;
+ options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb
+ options.stats_persist_period_sec = 5;
+ options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+ options.persist_stats_to_disk = true;
+ std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env;
+ mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
+ mock_env->set_current_time(0); // in seconds
+ options.env = mock_env.get();
+ CreateColumnFamilies({"pikachu"}, options);
+ ReopenWithColumnFamilies({"default", "pikachu"}, options);
+ ColumnFamilyData* cfd_default =
+ static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
+ ->cfd();
+ ColumnFamilyData* cfd_stats = static_cast<ColumnFamilyHandleImpl*>(
+ dbfull()->PersistentStatsColumnFamily())
+ ->cfd();
+ ColumnFamilyData* cfd_test =
+ static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
+
+ ASSERT_OK(Put("foo", "v0"));
+ ASSERT_OK(Put("bar", "v0"));
+ ASSERT_EQ("v0", Get("bar"));
+ ASSERT_EQ("v0", Get("foo"));
+ ASSERT_OK(Put(1, "Eevee", "v0"));
+ ASSERT_EQ("v0", Get(1, "Eevee"));
+ dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
+ // writing to all three cf, flush default cf
+ // LogNumbers: default: 14, stats: 4, pikachu: 4
+ ASSERT_OK(Flush());
+ ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
+ ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
+
+ ASSERT_OK(Put("foo1", "v1"));
+ ASSERT_OK(Put("bar1", "v1"));
+ ASSERT_EQ("v1", Get("bar1"));
+ ASSERT_EQ("v1", Get("foo1"));
+ ASSERT_OK(Put(1, "Vaporeon", "v1"));
+ ASSERT_EQ("v1", Get(1, "Vaporeon"));
+ // writing to default and test cf, flush test cf
+ // LogNumbers: default: 14, stats: 16, pikachu: 16
+ ASSERT_OK(Flush(1));
+ ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
+ ASSERT_GT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
+
+ ASSERT_OK(Put("foo2", "v2"));
+ ASSERT_OK(Put("bar2", "v2"));
+ ASSERT_EQ("v2", Get("bar2"));
+ ASSERT_EQ("v2", Get("foo2"));
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(10); });
+ // writing to default and stats cf, flushing default cf
+ // LogNumbers: default: 19, stats: 19, pikachu: 19
+ ASSERT_OK(Flush());
+ ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
+ ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
+
+ ASSERT_OK(Put("foo3", "v3"));
+ ASSERT_OK(Put("bar3", "v3"));
+ ASSERT_EQ("v3", Get("bar3"));
+ ASSERT_EQ("v3", Get("foo3"));
+ ASSERT_OK(Put(1, "Jolteon", "v3"));
+ ASSERT_EQ("v3", Get(1, "Jolteon"));
+ dbfull()->TEST_WaitForPersistStatsRun(
+ [&] { mock_env->set_current_time(15); });
+ // writing to all three cf, flushing test cf
+ // LogNumbers: default: 19, stats: 19, pikachu: 22
+ ASSERT_OK(Flush(1));
+ ASSERT_LT(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
+ ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
+ Close();
+}
+
+#endif // !ROCKSDB_LITE
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/monitoring/thread_status_impl.cc b/src/rocksdb/monitoring/thread_status_impl.cc
new file mode 100644
index 000000000..9619dfd81
--- /dev/null
+++ b/src/rocksdb/monitoring/thread_status_impl.cc
@@ -0,0 +1,163 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#include <sstream>
+
+#include "rocksdb/env.h"
+#include "rocksdb/thread_status.h"
+#include "util/string_util.h"
+#include "util/thread_operation.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifdef ROCKSDB_USING_THREAD_STATUS
+std::string ThreadStatus::GetThreadTypeName(
+ ThreadStatus::ThreadType thread_type) {
+ switch (thread_type) {
+ case ThreadStatus::ThreadType::HIGH_PRIORITY:
+ return "High Pri";
+ case ThreadStatus::ThreadType::LOW_PRIORITY:
+ return "Low Pri";
+ case ThreadStatus::ThreadType::USER:
+ return "User";
+ case ThreadStatus::ThreadType::BOTTOM_PRIORITY:
+ return "Bottom Pri";
+ case ThreadStatus::ThreadType::NUM_THREAD_TYPES:
+ assert(false);
+ }
+ return "Unknown";
+}
+
+const std::string& ThreadStatus::GetOperationName(
+ ThreadStatus::OperationType op_type) {
+ if (op_type < 0 || op_type >= NUM_OP_TYPES) {
+ return global_operation_table[OP_UNKNOWN].name;
+ }
+ return global_operation_table[op_type].name;
+}
+
+const std::string& ThreadStatus::GetOperationStageName(
+ ThreadStatus::OperationStage stage) {
+ if (stage < 0 || stage >= NUM_OP_STAGES) {
+ return global_op_stage_table[STAGE_UNKNOWN].name;
+ }
+ return global_op_stage_table[stage].name;
+}
+
+const std::string& ThreadStatus::GetStateName(
+ ThreadStatus::StateType state_type) {
+ if (state_type < 0 || state_type >= NUM_STATE_TYPES) {
+ return global_state_table[STATE_UNKNOWN].name;
+ }
+ return global_state_table[state_type].name;
+}
+
+const std::string ThreadStatus::MicrosToString(uint64_t micros) {
+ if (micros == 0) {
+ return "";
+ }
+ const int kBufferLen = 100;
+ char buffer[kBufferLen];
+ AppendHumanMicros(micros, buffer, kBufferLen, false);
+ return std::string(buffer);
+}
+
+const std::string& ThreadStatus::GetOperationPropertyName(
+ ThreadStatus::OperationType op_type, int i) {
+ static const std::string empty_str = "";
+ switch (op_type) {
+ case ThreadStatus::OP_COMPACTION:
+ if (i >= NUM_COMPACTION_PROPERTIES) {
+ return empty_str;
+ }
+ return compaction_operation_properties[i].name;
+ case ThreadStatus::OP_FLUSH:
+ if (i >= NUM_FLUSH_PROPERTIES) {
+ return empty_str;
+ }
+ return flush_operation_properties[i].name;
+ default:
+ return empty_str;
+ }
+}
+
+std::map<std::string, uint64_t> ThreadStatus::InterpretOperationProperties(
+ ThreadStatus::OperationType op_type, const uint64_t* op_properties) {
+ int num_properties;
+ switch (op_type) {
+ case OP_COMPACTION:
+ num_properties = NUM_COMPACTION_PROPERTIES;
+ break;
+ case OP_FLUSH:
+ num_properties = NUM_FLUSH_PROPERTIES;
+ break;
+ default:
+ num_properties = 0;
+ }
+
+ std::map<std::string, uint64_t> property_map;
+ for (int i = 0; i < num_properties; ++i) {
+ if (op_type == OP_COMPACTION && i == COMPACTION_INPUT_OUTPUT_LEVEL) {
+ property_map.insert({"BaseInputLevel", op_properties[i] >> 32});
+ property_map.insert(
+ {"OutputLevel", op_properties[i] % (uint64_t(1) << 32U)});
+ } else if (op_type == OP_COMPACTION && i == COMPACTION_PROP_FLAGS) {
+ property_map.insert({"IsManual", ((op_properties[i] & 2) >> 1)});
+ property_map.insert({"IsDeletion", ((op_properties[i] & 4) >> 2)});
+ property_map.insert({"IsTrivialMove", ((op_properties[i] & 8) >> 3)});
+ } else {
+ property_map.insert(
+ {GetOperationPropertyName(op_type, i), op_properties[i]});
+ }
+ }
+ return property_map;
+}
+
+#else
+
+std::string ThreadStatus::GetThreadTypeName(
+ ThreadStatus::ThreadType /*thread_type*/) {
+ static std::string dummy_str = "";
+ return dummy_str;
+}
+
+const std::string& ThreadStatus::GetOperationName(
+ ThreadStatus::OperationType /*op_type*/) {
+ static std::string dummy_str = "";
+ return dummy_str;
+}
+
+const std::string& ThreadStatus::GetOperationStageName(
+ ThreadStatus::OperationStage /*stage*/) {
+ static std::string dummy_str = "";
+ return dummy_str;
+}
+
+const std::string& ThreadStatus::GetStateName(
+ ThreadStatus::StateType /*state_type*/) {
+ static std::string dummy_str = "";
+ return dummy_str;
+}
+
+const std::string ThreadStatus::MicrosToString(uint64_t /*op_elapsed_time*/) {
+ static std::string dummy_str = "";
+ return dummy_str;
+}
+
+const std::string& ThreadStatus::GetOperationPropertyName(
+ ThreadStatus::OperationType /*op_type*/, int /*i*/) {
+ static std::string dummy_str = "";
+ return dummy_str;
+}
+
+std::map<std::string, uint64_t> ThreadStatus::InterpretOperationProperties(
+ ThreadStatus::OperationType /*op_type*/,
+ const uint64_t* /*op_properties*/) {
+ return std::map<std::string, uint64_t>();
+}
+
+#endif // ROCKSDB_USING_THREAD_STATUS
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/thread_status_updater.cc b/src/rocksdb/monitoring/thread_status_updater.cc
new file mode 100644
index 000000000..7e4b299a8
--- /dev/null
+++ b/src/rocksdb/monitoring/thread_status_updater.cc
@@ -0,0 +1,314 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "monitoring/thread_status_updater.h"
+#include <memory>
+#include "port/likely.h"
+#include "rocksdb/env.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifdef ROCKSDB_USING_THREAD_STATUS
+
+__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
+
+void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype,
+ uint64_t thread_id) {
+ if (UNLIKELY(thread_status_data_ == nullptr)) {
+ thread_status_data_ = new ThreadStatusData();
+ thread_status_data_->thread_type = ttype;
+ thread_status_data_->thread_id = thread_id;
+ std::lock_guard<std::mutex> lck(thread_list_mutex_);
+ thread_data_set_.insert(thread_status_data_);
+ }
+
+ ClearThreadOperationProperties();
+}
+
+void ThreadStatusUpdater::UnregisterThread() {
+ if (thread_status_data_ != nullptr) {
+ std::lock_guard<std::mutex> lck(thread_list_mutex_);
+ thread_data_set_.erase(thread_status_data_);
+ delete thread_status_data_;
+ thread_status_data_ = nullptr;
+ }
+}
+
+void ThreadStatusUpdater::ResetThreadStatus() {
+ ClearThreadState();
+ ClearThreadOperation();
+ SetColumnFamilyInfoKey(nullptr);
+}
+
+void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) {
+ auto* data = Get();
+ if (data == nullptr) {
+ return;
+ }
+ // set the tracking flag based on whether cf_key is non-null or not.
+ // If enable_thread_tracking is set to false, the input cf_key
+ // would be nullptr.
+ data->enable_tracking = (cf_key != nullptr);
+ data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
+}
+
+const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return nullptr;
+ }
+ return data->cf_key.load(std::memory_order_relaxed);
+}
+
+void ThreadStatusUpdater::SetThreadOperation(
+ const ThreadStatus::OperationType type) {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ // NOTE: Our practice here is to set all the thread operation properties
+ // and stage before we set thread operation, and thread operation
+ // will be set in std::memory_order_release. This is to ensure
+ // whenever a thread operation is not OP_UNKNOWN, we will always
+ // have a consistent information on its properties.
+ data->operation_type.store(type, std::memory_order_release);
+ if (type == ThreadStatus::OP_UNKNOWN) {
+ data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
+ std::memory_order_relaxed);
+ ClearThreadOperationProperties();
+ }
+}
+
+void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ data->op_properties[i].store(value, std::memory_order_relaxed);
+}
+
+void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i,
+ uint64_t delta) {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
+}
+
+void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ data->op_start_time.store(start_time, std::memory_order_relaxed);
+}
+
+void ThreadStatusUpdater::ClearThreadOperation() {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
+ std::memory_order_relaxed);
+ data->operation_type.store(ThreadStatus::OP_UNKNOWN,
+ std::memory_order_relaxed);
+ ClearThreadOperationProperties();
+}
+
+void ThreadStatusUpdater::ClearThreadOperationProperties() {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
+ data->op_properties[i].store(0, std::memory_order_relaxed);
+ }
+}
+
+ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
+ ThreadStatus::OperationStage stage) {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return ThreadStatus::STAGE_UNKNOWN;
+ }
+ return data->operation_stage.exchange(stage, std::memory_order_relaxed);
+}
+
+void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ data->state_type.store(type, std::memory_order_relaxed);
+}
+
+void ThreadStatusUpdater::ClearThreadState() {
+ auto* data = GetLocalThreadStatus();
+ if (data == nullptr) {
+ return;
+ }
+ data->state_type.store(ThreadStatus::STATE_UNKNOWN,
+ std::memory_order_relaxed);
+}
+
+Status ThreadStatusUpdater::GetThreadList(
+ std::vector<ThreadStatus>* thread_list) {
+ thread_list->clear();
+ std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
+ uint64_t now_micros = Env::Default()->NowMicros();
+
+ std::lock_guard<std::mutex> lck(thread_list_mutex_);
+ for (auto* thread_data : thread_data_set_) {
+ assert(thread_data);
+ auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed);
+ auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed);
+ // Since any change to cf_info_map requires thread_list_mutex,
+ // which is currently held by GetThreadList(), here we can safely
+ // use "memory_order_relaxed" to load the cf_key.
+ auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed);
+
+ ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
+ ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
+ ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
+ uint64_t op_elapsed_micros = 0;
+ uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};
+
+ auto iter = cf_info_map_.find(cf_key);
+ if (iter != cf_info_map_.end()) {
+ op_type = thread_data->operation_type.load(std::memory_order_acquire);
+ // display lower-level info only when higher-level info is available.
+ if (op_type != ThreadStatus::OP_UNKNOWN) {
+ op_elapsed_micros = now_micros - thread_data->op_start_time.load(
+ std::memory_order_relaxed);
+ op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
+ state_type = thread_data->state_type.load(std::memory_order_relaxed);
+ for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
+ op_props[i] =
+ thread_data->op_properties[i].load(std::memory_order_relaxed);
+ }
+ }
+ }
+
+ thread_list->emplace_back(
+ thread_id, thread_type,
+ iter != cf_info_map_.end() ? iter->second.db_name : "",
+ iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type,
+ op_elapsed_micros, op_stage, op_props, state_type);
+ }
+
+ return Status::OK();
+}
+
+ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
+ if (thread_status_data_ == nullptr) {
+ return nullptr;
+ }
+ if (!thread_status_data_->enable_tracking) {
+ assert(thread_status_data_->cf_key.load(std::memory_order_relaxed) ==
+ nullptr);
+ return nullptr;
+ }
+ return thread_status_data_;
+}
+
+void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key,
+ const std::string& db_name,
+ const void* cf_key,
+ const std::string& cf_name) {
+ // Acquiring same lock as GetThreadList() to guarantee
+ // a consistent view of global column family table (cf_info_map).
+ std::lock_guard<std::mutex> lck(thread_list_mutex_);
+
+ cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key),
+ std::make_tuple(db_key, db_name, cf_name));
+ db_key_map_[db_key].insert(cf_key);
+}
+
+void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
+ // Acquiring same lock as GetThreadList() to guarantee
+ // a consistent view of global column family table (cf_info_map).
+ std::lock_guard<std::mutex> lck(thread_list_mutex_);
+
+ auto cf_pair = cf_info_map_.find(cf_key);
+ if (cf_pair != cf_info_map_.end()) {
+ // Remove its entry from db_key_map_ by the following steps:
+ // 1. Obtain the entry in db_key_map_ whose set contains cf_key
+ // 2. Remove it from the set.
+ ConstantColumnFamilyInfo& cf_info = cf_pair->second;
+ auto db_pair = db_key_map_.find(cf_info.db_key);
+ assert(db_pair != db_key_map_.end());
+ size_t result __attribute__((__unused__));
+ result = db_pair->second.erase(cf_key);
+ assert(result);
+ cf_info_map_.erase(cf_pair);
+ }
+}
+
+void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
+ // Acquiring same lock as GetThreadList() to guarantee
+ // a consistent view of global column family table (cf_info_map).
+ std::lock_guard<std::mutex> lck(thread_list_mutex_);
+ auto db_pair = db_key_map_.find(db_key);
+ if (UNLIKELY(db_pair == db_key_map_.end())) {
+ // In some occasional cases such as DB::Open fails, we won't
+ // register ColumnFamilyInfo for a db.
+ return;
+ }
+
+ for (auto cf_key : db_pair->second) {
+ auto cf_pair = cf_info_map_.find(cf_key);
+ if (cf_pair != cf_info_map_.end()) {
+ cf_info_map_.erase(cf_pair);
+ }
+ }
+ db_key_map_.erase(db_key);
+}
+
+#else
+
+void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/,
+ uint64_t /*thread_id*/) {}
+
+void ThreadStatusUpdater::UnregisterThread() {}
+
+void ThreadStatusUpdater::ResetThreadStatus() {}
+
+void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {}
+
+void ThreadStatusUpdater::SetThreadOperation(
+ const ThreadStatus::OperationType /*type*/) {}
+
+void ThreadStatusUpdater::ClearThreadOperation() {}
+
+void ThreadStatusUpdater::SetThreadState(
+ const ThreadStatus::StateType /*type*/) {}
+
+void ThreadStatusUpdater::ClearThreadState() {}
+
+Status ThreadStatusUpdater::GetThreadList(
+ std::vector<ThreadStatus>* /*thread_list*/) {
+ return Status::NotSupported(
+ "GetThreadList is not supported in the current running environment.");
+}
+
+void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/,
+ const std::string& /*db_name*/,
+ const void* /*cf_key*/,
+ const std::string& /*cf_name*/) {}
+
+void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {}
+
+void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {}
+
+void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/,
+ uint64_t /*value*/) {}
+
+void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/,
+ uint64_t /*delta*/) {}
+
+#endif // ROCKSDB_USING_THREAD_STATUS
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/thread_status_updater.h b/src/rocksdb/monitoring/thread_status_updater.h
new file mode 100644
index 000000000..ac8c0f35d
--- /dev/null
+++ b/src/rocksdb/monitoring/thread_status_updater.h
@@ -0,0 +1,233 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// The implementation of ThreadStatus.
+//
+// Note that we make get and set access to ThreadStatusData lockless.
+// As a result, ThreadStatusData as a whole is not atomic. However,
+// we guarantee consistent ThreadStatusData all the time whenever
+// user call GetThreadList(). This consistency guarantee is done
+// by having the following constraint in the internal implementation
+// of set and get order:
+//
+// 1. When reset any information in ThreadStatusData, always start from
+// clearing up the lower-level information first.
+// 2. When setting any information in ThreadStatusData, always start from
+// setting the higher-level information.
+// 3. When returning ThreadStatusData to the user, fields are fetched from
+// higher-level to lower-level. In addition, where there's a nullptr
+// in one field, then all fields that has lower-level than that field
+// should be ignored.
+//
+// The high to low level information would be:
+// thread_id > thread_type > db > cf > operation > state
+//
+// This means user might not always get full information, but whenever
+// returned by the GetThreadList() is guaranteed to be consistent.
+#pragma once
+#include <atomic>
+#include <list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "rocksdb/status.h"
+#include "rocksdb/thread_status.h"
+#include "port/port.h"
+#include "util/thread_operation.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ColumnFamilyHandle;
+
+// The structure that keeps constant information about a column family.
+struct ConstantColumnFamilyInfo {
+#ifdef ROCKSDB_USING_THREAD_STATUS
+ public:
+ ConstantColumnFamilyInfo(
+ const void* _db_key,
+ const std::string& _db_name,
+ const std::string& _cf_name) :
+ db_key(_db_key), db_name(_db_name), cf_name(_cf_name) {}
+ const void* db_key;
+ const std::string db_name;
+ const std::string cf_name;
+#endif // ROCKSDB_USING_THREAD_STATUS
+};
+
+// the internal data-structure that is used to reflect the current
+// status of a thread using a set of atomic pointers.
+struct ThreadStatusData {
+#ifdef ROCKSDB_USING_THREAD_STATUS
+ explicit ThreadStatusData() : enable_tracking(false) {
+ thread_id.store(0);
+ thread_type.store(ThreadStatus::USER);
+ cf_key.store(nullptr);
+ operation_type.store(ThreadStatus::OP_UNKNOWN);
+ op_start_time.store(0);
+ state_type.store(ThreadStatus::STATE_UNKNOWN);
+ }
+
+ // A flag to indicate whether the thread tracking is enabled
+ // in the current thread. This value will be updated based on whether
+ // the associated Options::enable_thread_tracking is set to true
+ // in ThreadStatusUtil::SetColumnFamily().
+ //
+ // If set to false, then SetThreadOperation and SetThreadState
+ // will be no-op.
+ bool enable_tracking;
+
+ std::atomic<uint64_t> thread_id;
+ std::atomic<ThreadStatus::ThreadType> thread_type;
+ std::atomic<void*> cf_key;
+ std::atomic<ThreadStatus::OperationType> operation_type;
+ std::atomic<uint64_t> op_start_time;
+ std::atomic<ThreadStatus::OperationStage> operation_stage;
+ std::atomic<uint64_t> op_properties[ThreadStatus::kNumOperationProperties];
+ std::atomic<ThreadStatus::StateType> state_type;
+#endif // ROCKSDB_USING_THREAD_STATUS
+};
+
+// The class that stores and updates the status of the current thread
+// using a thread-local ThreadStatusData.
+//
+// In most of the case, you should use ThreadStatusUtil to update
+// the status of the current thread instead of using ThreadSatusUpdater
+// directly.
+//
+// @see ThreadStatusUtil
+class ThreadStatusUpdater {
+ public:
+ ThreadStatusUpdater() {}
+
+ // Releases all ThreadStatusData of all active threads.
+ virtual ~ThreadStatusUpdater() {}
+
+ // Unregister the current thread.
+ void UnregisterThread();
+
+ // Reset the status of the current thread. This includes resetting
+ // ColumnFamilyInfoKey, ThreadOperation, and ThreadState.
+ void ResetThreadStatus();
+
+ // Set the id of the current thread.
+ void SetThreadID(uint64_t thread_id);
+
+ // Register the current thread for tracking.
+ void RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id);
+
+ // Update the column-family info of the current thread by setting
+ // its thread-local pointer of ThreadStateInfo to the correct entry.
+ void SetColumnFamilyInfoKey(const void* cf_key);
+
+ // returns the column family info key.
+ const void* GetColumnFamilyInfoKey();
+
+ // Update the thread operation of the current thread.
+ void SetThreadOperation(const ThreadStatus::OperationType type);
+
+ // The start time of the current thread operation. It is in the format
+ // of micro-seconds since some fixed point in time.
+ void SetOperationStartTime(const uint64_t start_time);
+
+ // Set the "i"th property of the current operation.
+ //
+ // NOTE: Our practice here is to set all the thread operation properties
+ // and stage before we set thread operation, and thread operation
+ // will be set in std::memory_order_release. This is to ensure
+ // whenever a thread operation is not OP_UNKNOWN, we will always
+ // have a consistent information on its properties.
+ void SetThreadOperationProperty(
+ int i, uint64_t value);
+
+ // Increase the "i"th property of the current operation with
+ // the specified delta.
+ void IncreaseThreadOperationProperty(
+ int i, uint64_t delta);
+
+ // Update the thread operation stage of the current thread.
+ ThreadStatus::OperationStage SetThreadOperationStage(
+ const ThreadStatus::OperationStage stage);
+
+ // Clear thread operation of the current thread.
+ void ClearThreadOperation();
+
+ // Reset all thread-operation-properties to 0.
+ void ClearThreadOperationProperties();
+
+ // Update the thread state of the current thread.
+ void SetThreadState(const ThreadStatus::StateType type);
+
+ // Clear the thread state of the current thread.
+ void ClearThreadState();
+
+ // Obtain the status of all active registered threads.
+ Status GetThreadList(
+ std::vector<ThreadStatus>* thread_list);
+
+ // Create an entry in the global ColumnFamilyInfo table for the
+ // specified column family. This function should be called only
+ // when the current thread does not hold db_mutex.
+ void NewColumnFamilyInfo(
+ const void* db_key, const std::string& db_name,
+ const void* cf_key, const std::string& cf_name);
+
+ // Erase all ConstantColumnFamilyInfo that is associated with the
+ // specified db instance. This function should be called only when
+ // the current thread does not hold db_mutex.
+ void EraseDatabaseInfo(const void* db_key);
+
+ // Erase the ConstantColumnFamilyInfo that is associated with the
+ // specified ColumnFamilyData. This function should be called only
+ // when the current thread does not hold db_mutex.
+ void EraseColumnFamilyInfo(const void* cf_key);
+
+ // Verifies whether the input ColumnFamilyHandles matches
+ // the information stored in the current cf_info_map.
+ void TEST_VerifyColumnFamilyInfoMap(
+ const std::vector<ColumnFamilyHandle*>& handles,
+ bool check_exist);
+
+ protected:
+#ifdef ROCKSDB_USING_THREAD_STATUS
+ // The thread-local variable for storing thread status.
+ static __thread ThreadStatusData* thread_status_data_;
+
+ // Returns the pointer to the thread status data only when the
+ // thread status data is non-null and has enable_tracking == true.
+ ThreadStatusData* GetLocalThreadStatus();
+
+ // Directly returns the pointer to thread_status_data_ without
+ // checking whether enabling_tracking is true of not.
+ ThreadStatusData* Get() {
+ return thread_status_data_;
+ }
+
+ // The mutex that protects cf_info_map and db_key_map.
+ std::mutex thread_list_mutex_;
+
+ // The current status data of all active threads.
+ std::unordered_set<ThreadStatusData*> thread_data_set_;
+
+ // A global map that keeps the column family information. It is stored
+ // globally instead of inside DB is to avoid the situation where DB is
+ // closing while GetThreadList function already get the pointer to its
+ // CopnstantColumnFamilyInfo.
+ std::unordered_map<const void*, ConstantColumnFamilyInfo> cf_info_map_;
+
+ // A db_key to cf_key map that allows erasing elements in cf_info_map
+ // associated to the same db_key faster.
+ std::unordered_map<
+ const void*, std::unordered_set<const void*>> db_key_map_;
+
+#else
+ static ThreadStatusData* thread_status_data_;
+#endif // ROCKSDB_USING_THREAD_STATUS
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/thread_status_updater_debug.cc b/src/rocksdb/monitoring/thread_status_updater_debug.cc
new file mode 100644
index 000000000..5937d91fa
--- /dev/null
+++ b/src/rocksdb/monitoring/thread_status_updater_debug.cc
@@ -0,0 +1,42 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <mutex>
+
+#include "db/column_family.h"
+#include "monitoring/thread_status_updater.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifndef NDEBUG
+#ifdef ROCKSDB_USING_THREAD_STATUS
+void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap(
+ const std::vector<ColumnFamilyHandle*>& handles, bool check_exist) {
+ std::unique_lock<std::mutex> lock(thread_list_mutex_);
+ if (check_exist) {
+ assert(cf_info_map_.size() == handles.size());
+ }
+ for (auto* handle : handles) {
+ auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
+ auto iter __attribute__((__unused__)) = cf_info_map_.find(cfd);
+ if (check_exist) {
+ assert(iter != cf_info_map_.end());
+ assert(iter->second.cf_name == cfd->GetName());
+ } else {
+ assert(iter == cf_info_map_.end());
+ }
+ }
+}
+
+#else
+
+void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap(
+ const std::vector<ColumnFamilyHandle*>& /*handles*/, bool /*check_exist*/) {
+}
+
+#endif // ROCKSDB_USING_THREAD_STATUS
+#endif // !NDEBUG
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/thread_status_util.cc b/src/rocksdb/monitoring/thread_status_util.cc
new file mode 100644
index 000000000..13a79163c
--- /dev/null
+++ b/src/rocksdb/monitoring/thread_status_util.cc
@@ -0,0 +1,206 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "monitoring/thread_status_util.h"
+
+#include "monitoring/thread_status_updater.h"
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifdef ROCKSDB_USING_THREAD_STATUS
+__thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ =
+ nullptr;
+__thread bool ThreadStatusUtil::thread_updater_initialized_ = false;
+
+void ThreadStatusUtil::RegisterThread(const Env* env,
+ ThreadStatus::ThreadType thread_type) {
+ if (!MaybeInitThreadLocalUpdater(env)) {
+ return;
+ }
+ assert(thread_updater_local_cache_);
+ thread_updater_local_cache_->RegisterThread(thread_type, env->GetThreadID());
+}
+
+void ThreadStatusUtil::UnregisterThread() {
+ thread_updater_initialized_ = false;
+ if (thread_updater_local_cache_ != nullptr) {
+ thread_updater_local_cache_->UnregisterThread();
+ thread_updater_local_cache_ = nullptr;
+ }
+}
+
+void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd,
+ const Env* env,
+ bool enable_thread_tracking) {
+ if (!MaybeInitThreadLocalUpdater(env)) {
+ return;
+ }
+ assert(thread_updater_local_cache_);
+ if (cfd != nullptr && enable_thread_tracking) {
+ thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd);
+ } else {
+ // When cfd == nullptr or enable_thread_tracking == false, we set
+ // ColumnFamilyInfoKey to nullptr, which makes SetThreadOperation
+ // and SetThreadState become no-op.
+ thread_updater_local_cache_->SetColumnFamilyInfoKey(nullptr);
+ }
+}
+
+void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) {
+ if (thread_updater_local_cache_ == nullptr) {
+ // thread_updater_local_cache_ must be set in SetColumnFamily
+ // or other ThreadStatusUtil functions.
+ return;
+ }
+
+ if (op != ThreadStatus::OP_UNKNOWN) {
+ uint64_t current_time = Env::Default()->NowMicros();
+ thread_updater_local_cache_->SetOperationStartTime(current_time);
+ } else {
+ // TDOO(yhchiang): we could report the time when we set operation to
+ // OP_UNKNOWN once the whole instrumentation has been done.
+ thread_updater_local_cache_->SetOperationStartTime(0);
+ }
+ thread_updater_local_cache_->SetThreadOperation(op);
+}
+
+ThreadStatus::OperationStage ThreadStatusUtil::SetThreadOperationStage(
+ ThreadStatus::OperationStage stage) {
+ if (thread_updater_local_cache_ == nullptr) {
+ // thread_updater_local_cache_ must be set in SetColumnFamily
+ // or other ThreadStatusUtil functions.
+ return ThreadStatus::STAGE_UNKNOWN;
+ }
+
+ return thread_updater_local_cache_->SetThreadOperationStage(stage);
+}
+
+void ThreadStatusUtil::SetThreadOperationProperty(int code, uint64_t value) {
+ if (thread_updater_local_cache_ == nullptr) {
+ // thread_updater_local_cache_ must be set in SetColumnFamily
+ // or other ThreadStatusUtil functions.
+ return;
+ }
+
+ thread_updater_local_cache_->SetThreadOperationProperty(code, value);
+}
+
+void ThreadStatusUtil::IncreaseThreadOperationProperty(int code,
+ uint64_t delta) {
+ if (thread_updater_local_cache_ == nullptr) {
+ // thread_updater_local_cache_ must be set in SetColumnFamily
+ // or other ThreadStatusUtil functions.
+ return;
+ }
+
+ thread_updater_local_cache_->IncreaseThreadOperationProperty(code, delta);
+}
+
+void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) {
+ if (thread_updater_local_cache_ == nullptr) {
+ // thread_updater_local_cache_ must be set in SetColumnFamily
+ // or other ThreadStatusUtil functions.
+ return;
+ }
+
+ thread_updater_local_cache_->SetThreadState(state);
+}
+
+void ThreadStatusUtil::ResetThreadStatus() {
+ if (thread_updater_local_cache_ == nullptr) {
+ return;
+ }
+ thread_updater_local_cache_->ResetThreadStatus();
+}
+
+void ThreadStatusUtil::NewColumnFamilyInfo(const DB* db,
+ const ColumnFamilyData* cfd,
+ const std::string& cf_name,
+ const Env* env) {
+ if (!MaybeInitThreadLocalUpdater(env)) {
+ return;
+ }
+ assert(thread_updater_local_cache_);
+ if (thread_updater_local_cache_) {
+ thread_updater_local_cache_->NewColumnFamilyInfo(db, db->GetName(), cfd,
+ cf_name);
+ }
+}
+
+void ThreadStatusUtil::EraseColumnFamilyInfo(const ColumnFamilyData* cfd) {
+ if (thread_updater_local_cache_ == nullptr) {
+ return;
+ }
+ thread_updater_local_cache_->EraseColumnFamilyInfo(cfd);
+}
+
+void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) {
+ ThreadStatusUpdater* thread_updater = db->GetEnv()->GetThreadStatusUpdater();
+ if (thread_updater == nullptr) {
+ return;
+ }
+ thread_updater->EraseDatabaseInfo(db);
+}
+
+bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) {
+ if (!thread_updater_initialized_ && env != nullptr) {
+ thread_updater_initialized_ = true;
+ thread_updater_local_cache_ = env->GetThreadStatusUpdater();
+ }
+ return (thread_updater_local_cache_ != nullptr);
+}
+
+AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater(
+ ThreadStatus::OperationStage stage) {
+ prev_stage_ = ThreadStatusUtil::SetThreadOperationStage(stage);
+}
+
+AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() {
+ ThreadStatusUtil::SetThreadOperationStage(prev_stage_);
+}
+
+#else
+
+ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
+bool ThreadStatusUtil::thread_updater_initialized_ = false;
+
+bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* /*env*/) {
+ return false;
+}
+
+void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* /*cfd*/,
+ const Env* /*env*/,
+ bool /*enable_thread_tracking*/) {}
+
+void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType /*op*/) {}
+
+void ThreadStatusUtil::SetThreadOperationProperty(int /*code*/,
+ uint64_t /*value*/) {}
+
+void ThreadStatusUtil::IncreaseThreadOperationProperty(int /*code*/,
+ uint64_t /*delta*/) {}
+
+void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType /*state*/) {}
+
+void ThreadStatusUtil::NewColumnFamilyInfo(const DB* /*db*/,
+ const ColumnFamilyData* /*cfd*/,
+ const std::string& /*cf_name*/,
+ const Env* /*env*/) {}
+
+void ThreadStatusUtil::EraseColumnFamilyInfo(const ColumnFamilyData* /*cfd*/) {}
+
+void ThreadStatusUtil::EraseDatabaseInfo(const DB* /*db*/) {}
+
+void ThreadStatusUtil::ResetThreadStatus() {}
+
+AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater(
+ ThreadStatus::OperationStage /*stage*/) {}
+
+AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() {}
+
+#endif // ROCKSDB_USING_THREAD_STATUS
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/thread_status_util.h b/src/rocksdb/monitoring/thread_status_util.h
new file mode 100644
index 000000000..b4d97b0b6
--- /dev/null
+++ b/src/rocksdb/monitoring/thread_status_util.h
@@ -0,0 +1,134 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <string>
+
+#include "monitoring/thread_status_updater.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/thread_status.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class ColumnFamilyData;
+
+// The static utility class for updating thread-local status.
+//
+// The thread-local status is updated via the thread-local cached
+// pointer thread_updater_local_cache_. During each function call,
+// when ThreadStatusUtil finds thread_updater_local_cache_ is
+// left uninitialized (determined by thread_updater_initialized_),
+// it will tries to initialize it using the return value of
+// Env::GetThreadStatusUpdater(). When thread_updater_local_cache_
+// is initialized by a non-null pointer, each function call will
+// then update the status of the current thread. Otherwise,
+// all function calls to ThreadStatusUtil will be no-op.
+class ThreadStatusUtil {
+ public:
+ // Register the current thread for tracking.
+ static void RegisterThread(
+ const Env* env, ThreadStatus::ThreadType thread_type);
+
+ // Unregister the current thread.
+ static void UnregisterThread();
+
+ // Create an entry in the global ColumnFamilyInfo table for the
+ // specified column family. This function should be called only
+ // when the current thread does not hold db_mutex.
+ static void NewColumnFamilyInfo(const DB* db, const ColumnFamilyData* cfd,
+ const std::string& cf_name, const Env* env);
+
+ // Erase the ConstantColumnFamilyInfo that is associated with the
+ // specified ColumnFamilyData. This function should be called only
+ // when the current thread does not hold db_mutex.
+ static void EraseColumnFamilyInfo(const ColumnFamilyData* cfd);
+
+ // Erase all ConstantColumnFamilyInfo that is associated with the
+ // specified db instance. This function should be called only when
+ // the current thread does not hold db_mutex.
+ static void EraseDatabaseInfo(const DB* db);
+
+ // Update the thread status to indicate the current thread is doing
+ // something related to the specified column family.
+ static void SetColumnFamily(const ColumnFamilyData* cfd, const Env* env,
+ bool enable_thread_tracking);
+
+ static void SetThreadOperation(ThreadStatus::OperationType type);
+
+ static ThreadStatus::OperationStage SetThreadOperationStage(
+ ThreadStatus::OperationStage stage);
+
+ static void SetThreadOperationProperty(
+ int code, uint64_t value);
+
+ static void IncreaseThreadOperationProperty(
+ int code, uint64_t delta);
+
+ static void SetThreadState(ThreadStatus::StateType type);
+
+ static void ResetThreadStatus();
+
+#ifndef NDEBUG
+ static void TEST_SetStateDelay(
+ const ThreadStatus::StateType state, int micro);
+ static void TEST_StateDelay(const ThreadStatus::StateType state);
+#endif
+
+ protected:
+ // Initialize the thread-local ThreadStatusUpdater when it finds
+ // the cached value is nullptr. Returns true if it has cached
+ // a non-null pointer.
+ static bool MaybeInitThreadLocalUpdater(const Env* env);
+
+#ifdef ROCKSDB_USING_THREAD_STATUS
+ // A boolean flag indicating whether thread_updater_local_cache_
+ // is initialized. It is set to true when an Env uses any
+ // ThreadStatusUtil functions using the current thread other
+ // than UnregisterThread(). It will be set to false when
+ // UnregisterThread() is called.
+ //
+ // When this variable is set to true, thread_updater_local_cache_
+ // will not be updated until this variable is again set to false
+ // in UnregisterThread().
+ static __thread bool thread_updater_initialized_;
+
+ // The thread-local cached ThreadStatusUpdater that caches the
+ // thread_status_updater_ of the first Env that uses any ThreadStatusUtil
+ // function other than UnregisterThread(). This variable will
+ // be cleared when UnregisterThread() is called.
+ //
+ // When this variable is set to a non-null pointer, then the status
+ // of the current thread will be updated when a function of
+ // ThreadStatusUtil is called. Otherwise, all functions of
+ // ThreadStatusUtil will be no-op.
+ //
+ // When thread_updater_initialized_ is set to true, this variable
+ // will not be updated until this thread_updater_initialized_ is
+ // again set to false in UnregisterThread().
+ static __thread ThreadStatusUpdater* thread_updater_local_cache_;
+#else
+ static bool thread_updater_initialized_;
+ static ThreadStatusUpdater* thread_updater_local_cache_;
+#endif
+};
+
+// A helper class for updating thread state. It will set the
+// thread state according to the input parameter in its constructor
+// and set the thread state to the previous state in its destructor.
+class AutoThreadOperationStageUpdater {
+ public:
+ explicit AutoThreadOperationStageUpdater(
+ ThreadStatus::OperationStage stage);
+ ~AutoThreadOperationStageUpdater();
+
+#ifdef ROCKSDB_USING_THREAD_STATUS
+ private:
+ ThreadStatus::OperationStage prev_stage_;
+#endif
+};
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/monitoring/thread_status_util_debug.cc b/src/rocksdb/monitoring/thread_status_util_debug.cc
new file mode 100644
index 000000000..375fe8c0a
--- /dev/null
+++ b/src/rocksdb/monitoring/thread_status_util_debug.cc
@@ -0,0 +1,32 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include <atomic>
+
+#include "monitoring/thread_status_updater.h"
+#include "monitoring/thread_status_util.h"
+#include "rocksdb/env.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifndef NDEBUG
+// the delay for debugging purpose.
+static std::atomic<int> states_delay[ThreadStatus::NUM_STATE_TYPES];
+
+void ThreadStatusUtil::TEST_SetStateDelay(
+ const ThreadStatus::StateType state, int micro) {
+ states_delay[state].store(micro, std::memory_order_relaxed);
+}
+
+void ThreadStatusUtil::TEST_StateDelay(const ThreadStatus::StateType state) {
+ auto delay = states_delay[state].load(std::memory_order_relaxed);
+ if (delay > 0) {
+ Env::Default()->SleepForMicroseconds(delay);
+ }
+}
+
+#endif // !NDEBUG
+
+} // namespace ROCKSDB_NAMESPACE