diff options
Diffstat (limited to '')
31 files changed, 4806 insertions, 0 deletions
diff --git a/src/rocksdb/monitoring/file_read_sample.h b/src/rocksdb/monitoring/file_read_sample.h new file mode 100644 index 000000000..82a933e0a --- /dev/null +++ b/src/rocksdb/monitoring/file_read_sample.h @@ -0,0 +1,23 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "db/version_edit.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { +static const uint32_t kFileReadSampleRate = 1024; +extern bool should_sample_file_read(); +extern void sample_file_read_inc(FileMetaData*); + +inline bool should_sample_file_read() { + return (Random::GetTLSInstance()->Next() % kFileReadSampleRate == 307); +} + +inline void sample_file_read_inc(FileMetaData* meta) { + meta->stats.num_reads_sampled.fetch_add(kFileReadSampleRate, + std::memory_order_relaxed); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/histogram.cc b/src/rocksdb/monitoring/histogram.cc new file mode 100644 index 000000000..55339f888 --- /dev/null +++ b/src/rocksdb/monitoring/histogram.cc @@ -0,0 +1,288 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "monitoring/histogram.h" + +#include <stdio.h> +#include <cassert> +#include <cinttypes> +#include <cmath> + +#include "port/port.h" +#include "util/cast_util.h" + +namespace ROCKSDB_NAMESPACE { + +HistogramBucketMapper::HistogramBucketMapper() { + // If you change this, you also need to change + // size of array buckets_ in HistogramImpl + bucketValues_ = {1, 2}; + valueIndexMap_ = {{1, 0}, {2, 1}}; + double bucket_val = static_cast<double>(bucketValues_.back()); + while ((bucket_val = 1.5 * bucket_val) <= static_cast<double>(port::kMaxUint64)) { + bucketValues_.push_back(static_cast<uint64_t>(bucket_val)); + // Extracts two most significant digits to make histogram buckets more + // human-readable. E.g., 172 becomes 170. + uint64_t pow_of_ten = 1; + while (bucketValues_.back() / 10 > 10) { + bucketValues_.back() /= 10; + pow_of_ten *= 10; + } + bucketValues_.back() *= pow_of_ten; + valueIndexMap_[bucketValues_.back()] = bucketValues_.size() - 1; + } + maxBucketValue_ = bucketValues_.back(); + minBucketValue_ = bucketValues_.front(); +} + +size_t HistogramBucketMapper::IndexForValue(const uint64_t value) const { + if (value >= maxBucketValue_) { + return bucketValues_.size() - 1; + } else if ( value >= minBucketValue_ ) { + std::map<uint64_t, uint64_t>::const_iterator lowerBound = + valueIndexMap_.lower_bound(value); + if (lowerBound != valueIndexMap_.end()) { + return static_cast<size_t>(lowerBound->second); + } else { + return 0; + } + } else { + return 0; + } +} + +namespace { + const HistogramBucketMapper bucketMapper; +} + +HistogramStat::HistogramStat() + : num_buckets_(bucketMapper.BucketCount()) { + assert(num_buckets_ == sizeof(buckets_) / sizeof(*buckets_)); + Clear(); +} + +void HistogramStat::Clear() { + min_.store(bucketMapper.LastValue(), std::memory_order_relaxed); + max_.store(0, std::memory_order_relaxed); + num_.store(0, std::memory_order_relaxed); + sum_.store(0, std::memory_order_relaxed); + sum_squares_.store(0, std::memory_order_relaxed); + for (unsigned int b = 0; b < num_buckets_; b++) { + buckets_[b].store(0, std::memory_order_relaxed); + } +}; + +bool HistogramStat::Empty() const { return num() == 0; } + +void HistogramStat::Add(uint64_t value) { + // This function is designed to be lock free, as it's in the critical path + // of any operation. Each individual value is atomic and the order of updates + // by concurrent threads is tolerable. + const size_t index = bucketMapper.IndexForValue(value); + assert(index < num_buckets_); + buckets_[index].store(buckets_[index].load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + + uint64_t old_min = min(); + if (value < old_min) { + min_.store(value, std::memory_order_relaxed); + } + + uint64_t old_max = max(); + if (value > old_max) { + max_.store(value, std::memory_order_relaxed); + } + + num_.store(num_.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + sum_.store(sum_.load(std::memory_order_relaxed) + value, + std::memory_order_relaxed); + sum_squares_.store( + sum_squares_.load(std::memory_order_relaxed) + value * value, + std::memory_order_relaxed); +} + +void HistogramStat::Merge(const HistogramStat& other) { + // This function needs to be performned with the outer lock acquired + // However, atomic operation on every member is still need, since Add() + // requires no lock and value update can still happen concurrently + uint64_t old_min = min(); + uint64_t other_min = other.min(); + while (other_min < old_min && + !min_.compare_exchange_weak(old_min, other_min)) {} + + uint64_t old_max = max(); + uint64_t other_max = other.max(); + while (other_max > old_max && + !max_.compare_exchange_weak(old_max, other_max)) {} + + num_.fetch_add(other.num(), std::memory_order_relaxed); + sum_.fetch_add(other.sum(), std::memory_order_relaxed); + sum_squares_.fetch_add(other.sum_squares(), std::memory_order_relaxed); + for (unsigned int b = 0; b < num_buckets_; b++) { + buckets_[b].fetch_add(other.bucket_at(b), std::memory_order_relaxed); + } +} + +double HistogramStat::Median() const { + return Percentile(50.0); +} + +double HistogramStat::Percentile(double p) const { + double threshold = num() * (p / 100.0); + uint64_t cumulative_sum = 0; + for (unsigned int b = 0; b < num_buckets_; b++) { + uint64_t bucket_value = bucket_at(b); + cumulative_sum += bucket_value; + if (cumulative_sum >= threshold) { + // Scale linearly within this bucket + uint64_t left_point = (b == 0) ? 0 : bucketMapper.BucketLimit(b-1); + uint64_t right_point = bucketMapper.BucketLimit(b); + uint64_t left_sum = cumulative_sum - bucket_value; + uint64_t right_sum = cumulative_sum; + double pos = 0; + uint64_t right_left_diff = right_sum - left_sum; + if (right_left_diff != 0) { + pos = (threshold - left_sum) / right_left_diff; + } + double r = left_point + (right_point - left_point) * pos; + uint64_t cur_min = min(); + uint64_t cur_max = max(); + if (r < cur_min) r = static_cast<double>(cur_min); + if (r > cur_max) r = static_cast<double>(cur_max); + return r; + } + } + return static_cast<double>(max()); +} + +double HistogramStat::Average() const { + uint64_t cur_num = num(); + uint64_t cur_sum = sum(); + if (cur_num == 0) return 0; + return static_cast<double>(cur_sum) / static_cast<double>(cur_num); +} + +double HistogramStat::StandardDeviation() const { + uint64_t cur_num = num(); + uint64_t cur_sum = sum(); + uint64_t cur_sum_squares = sum_squares(); + if (cur_num == 0) return 0; + double variance = + static_cast<double>(cur_sum_squares * cur_num - cur_sum * cur_sum) / + static_cast<double>(cur_num * cur_num); + return std::sqrt(variance); +} +std::string HistogramStat::ToString() const { + uint64_t cur_num = num(); + std::string r; + char buf[1650]; + snprintf(buf, sizeof(buf), + "Count: %" PRIu64 " Average: %.4f StdDev: %.2f\n", + cur_num, Average(), StandardDeviation()); + r.append(buf); + snprintf(buf, sizeof(buf), + "Min: %" PRIu64 " Median: %.4f Max: %" PRIu64 "\n", + (cur_num == 0 ? 0 : min()), Median(), (cur_num == 0 ? 0 : max())); + r.append(buf); + snprintf(buf, sizeof(buf), + "Percentiles: " + "P50: %.2f P75: %.2f P99: %.2f P99.9: %.2f P99.99: %.2f\n", + Percentile(50), Percentile(75), Percentile(99), Percentile(99.9), + Percentile(99.99)); + r.append(buf); + r.append("------------------------------------------------------\n"); + if (cur_num == 0) return r; // all buckets are empty + const double mult = 100.0 / cur_num; + uint64_t cumulative_sum = 0; + for (unsigned int b = 0; b < num_buckets_; b++) { + uint64_t bucket_value = bucket_at(b); + if (bucket_value <= 0.0) continue; + cumulative_sum += bucket_value; + snprintf(buf, sizeof(buf), + "%c %7" PRIu64 ", %7" PRIu64 " ] %8" PRIu64 " %7.3f%% %7.3f%% ", + (b == 0) ? '[' : '(', + (b == 0) ? 0 : bucketMapper.BucketLimit(b-1), // left + bucketMapper.BucketLimit(b), // right + bucket_value, // count + (mult * bucket_value), // percentage + (mult * cumulative_sum)); // cumulative percentage + r.append(buf); + + // Add hash marks based on percentage; 20 marks for 100%. + size_t marks = static_cast<size_t>(mult * bucket_value / 5 + 0.5); + r.append(marks, '#'); + r.push_back('\n'); + } + return r; +} + +void HistogramStat::Data(HistogramData * const data) const { + assert(data); + data->median = Median(); + data->percentile95 = Percentile(95); + data->percentile99 = Percentile(99); + data->max = static_cast<double>(max()); + data->average = Average(); + data->standard_deviation = StandardDeviation(); + data->count = num(); + data->sum = sum(); + data->min = static_cast<double>(min()); +} + +void HistogramImpl::Clear() { + std::lock_guard<std::mutex> lock(mutex_); + stats_.Clear(); +} + +bool HistogramImpl::Empty() const { + return stats_.Empty(); +} + +void HistogramImpl::Add(uint64_t value) { + stats_.Add(value); +} + +void HistogramImpl::Merge(const Histogram& other) { + if (strcmp(Name(), other.Name()) == 0) { + Merge( + *static_cast_with_check<const HistogramImpl, const Histogram>(&other)); + } +} + +void HistogramImpl::Merge(const HistogramImpl& other) { + std::lock_guard<std::mutex> lock(mutex_); + stats_.Merge(other.stats_); +} + +double HistogramImpl::Median() const { + return stats_.Median(); +} + +double HistogramImpl::Percentile(double p) const { + return stats_.Percentile(p); +} + +double HistogramImpl::Average() const { + return stats_.Average(); +} + +double HistogramImpl::StandardDeviation() const { + return stats_.StandardDeviation(); +} + +std::string HistogramImpl::ToString() const { + return stats_.ToString(); +} + +void HistogramImpl::Data(HistogramData * const data) const { + stats_.Data(data); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/histogram.h b/src/rocksdb/monitoring/histogram.h new file mode 100644 index 000000000..a6b93e8fd --- /dev/null +++ b/src/rocksdb/monitoring/histogram.h @@ -0,0 +1,149 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include "rocksdb/statistics.h" + +#include <cassert> +#include <string> +#include <vector> +#include <map> +#include <mutex> + +namespace ROCKSDB_NAMESPACE { + +class HistogramBucketMapper { + public: + + HistogramBucketMapper(); + + // converts a value to the bucket index. + size_t IndexForValue(uint64_t value) const; + // number of buckets required. + + size_t BucketCount() const { + return bucketValues_.size(); + } + + uint64_t LastValue() const { + return maxBucketValue_; + } + + uint64_t FirstValue() const { + return minBucketValue_; + } + + uint64_t BucketLimit(const size_t bucketNumber) const { + assert(bucketNumber < BucketCount()); + return bucketValues_[bucketNumber]; + } + + private: + std::vector<uint64_t> bucketValues_; + uint64_t maxBucketValue_; + uint64_t minBucketValue_; + std::map<uint64_t, uint64_t> valueIndexMap_; +}; + +struct HistogramStat { + HistogramStat(); + ~HistogramStat() {} + + HistogramStat(const HistogramStat&) = delete; + HistogramStat& operator=(const HistogramStat&) = delete; + + void Clear(); + bool Empty() const; + void Add(uint64_t value); + void Merge(const HistogramStat& other); + + inline uint64_t min() const { return min_.load(std::memory_order_relaxed); } + inline uint64_t max() const { return max_.load(std::memory_order_relaxed); } + inline uint64_t num() const { return num_.load(std::memory_order_relaxed); } + inline uint64_t sum() const { return sum_.load(std::memory_order_relaxed); } + inline uint64_t sum_squares() const { + return sum_squares_.load(std::memory_order_relaxed); + } + inline uint64_t bucket_at(size_t b) const { + return buckets_[b].load(std::memory_order_relaxed); + } + + double Median() const; + double Percentile(double p) const; + double Average() const; + double StandardDeviation() const; + void Data(HistogramData* const data) const; + std::string ToString() const; + + // To be able to use HistogramStat as thread local variable, it + // cannot have dynamic allocated member. That's why we're + // using manually values from BucketMapper + std::atomic_uint_fast64_t min_; + std::atomic_uint_fast64_t max_; + std::atomic_uint_fast64_t num_; + std::atomic_uint_fast64_t sum_; + std::atomic_uint_fast64_t sum_squares_; + std::atomic_uint_fast64_t buckets_[109]; // 109==BucketMapper::BucketCount() + const uint64_t num_buckets_; +}; + +class Histogram { +public: + Histogram() {} + virtual ~Histogram() {}; + + virtual void Clear() = 0; + virtual bool Empty() const = 0; + virtual void Add(uint64_t value) = 0; + virtual void Merge(const Histogram&) = 0; + + virtual std::string ToString() const = 0; + virtual const char* Name() const = 0; + virtual uint64_t min() const = 0; + virtual uint64_t max() const = 0; + virtual uint64_t num() const = 0; + virtual double Median() const = 0; + virtual double Percentile(double p) const = 0; + virtual double Average() const = 0; + virtual double StandardDeviation() const = 0; + virtual void Data(HistogramData* const data) const = 0; +}; + +class HistogramImpl : public Histogram { + public: + HistogramImpl() { Clear(); } + + HistogramImpl(const HistogramImpl&) = delete; + HistogramImpl& operator=(const HistogramImpl&) = delete; + + virtual void Clear() override; + virtual bool Empty() const override; + virtual void Add(uint64_t value) override; + virtual void Merge(const Histogram& other) override; + void Merge(const HistogramImpl& other); + + virtual std::string ToString() const override; + virtual const char* Name() const override { return "HistogramImpl"; } + virtual uint64_t min() const override { return stats_.min(); } + virtual uint64_t max() const override { return stats_.max(); } + virtual uint64_t num() const override { return stats_.num(); } + virtual double Median() const override; + virtual double Percentile(double p) const override; + virtual double Average() const override; + virtual double StandardDeviation() const override; + virtual void Data(HistogramData* const data) const override; + + virtual ~HistogramImpl() {} + + private: + HistogramStat stats_; + std::mutex mutex_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/histogram_test.cc b/src/rocksdb/monitoring/histogram_test.cc new file mode 100644 index 000000000..36a7d7154 --- /dev/null +++ b/src/rocksdb/monitoring/histogram_test.cc @@ -0,0 +1,221 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include <cmath> + +#include "monitoring/histogram.h" +#include "monitoring/histogram_windowing.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +class HistogramTest : public testing::Test {}; + +namespace { + const double kIota = 0.1; + const HistogramBucketMapper bucketMapper; + Env* env = Env::Default(); +} + +void PopulateHistogram(Histogram& histogram, + uint64_t low, uint64_t high, uint64_t loop = 1) { + for (; loop > 0; loop--) { + for (uint64_t i = low; i <= high; i++) { + histogram.Add(i); + } + } +} + +void BasicOperation(Histogram& histogram) { + PopulateHistogram(histogram, 1, 110, 10); // fill up to bucket [70, 110) + + HistogramData data; + histogram.Data(&data); + + ASSERT_LE(fabs(histogram.Percentile(100.0) - 110.0), kIota); + ASSERT_LE(fabs(data.percentile99 - 108.9), kIota); // 99 * 110 / 100 + ASSERT_LE(fabs(data.percentile95 - 104.5), kIota); // 95 * 110 / 100 + ASSERT_LE(fabs(data.median - 55.0), kIota); // 50 * 110 / 100 + ASSERT_EQ(data.average, 55.5); // (1 + 110) / 2 +} + +void MergeHistogram(Histogram& histogram, Histogram& other) { + PopulateHistogram(histogram, 1, 100); + PopulateHistogram(other, 101, 250); + histogram.Merge(other); + + HistogramData data; + histogram.Data(&data); + + ASSERT_LE(fabs(histogram.Percentile(100.0) - 250.0), kIota); + ASSERT_LE(fabs(data.percentile99 - 247.5), kIota); // 99 * 250 / 100 + ASSERT_LE(fabs(data.percentile95 - 237.5), kIota); // 95 * 250 / 100 + ASSERT_LE(fabs(data.median - 125.0), kIota); // 50 * 250 / 100 + ASSERT_EQ(data.average, 125.5); // (1 + 250) / 2 +} + +void EmptyHistogram(Histogram& histogram) { + ASSERT_EQ(histogram.min(), bucketMapper.LastValue()); + ASSERT_EQ(histogram.max(), 0); + ASSERT_EQ(histogram.num(), 0); + ASSERT_EQ(histogram.Median(), 0.0); + ASSERT_EQ(histogram.Percentile(85.0), 0.0); + ASSERT_EQ(histogram.Average(), 0.0); + ASSERT_EQ(histogram.StandardDeviation(), 0.0); +} + +void ClearHistogram(Histogram& histogram) { + for (uint64_t i = 1; i <= 100; i++) { + histogram.Add(i); + } + histogram.Clear(); + ASSERT_TRUE(histogram.Empty()); + ASSERT_EQ(histogram.Median(), 0); + ASSERT_EQ(histogram.Percentile(85.0), 0); + ASSERT_EQ(histogram.Average(), 0); +} + +TEST_F(HistogramTest, BasicOperation) { + HistogramImpl histogram; + BasicOperation(histogram); + + HistogramWindowingImpl histogramWindowing; + BasicOperation(histogramWindowing); +} + +TEST_F(HistogramTest, BoundaryValue) { + HistogramImpl histogram; + // - both should be in [0, 1] bucket because we place values on bucket + // boundaries in the lower bucket. + // - all points are in [0, 1] bucket, so p50 will be 0.5 + // - the test cannot be written with a single point since histogram won't + // report percentiles lower than the min or greater than the max. + histogram.Add(0); + histogram.Add(1); + + ASSERT_LE(fabs(histogram.Percentile(50.0) - 0.5), kIota); +} + +TEST_F(HistogramTest, MergeHistogram) { + HistogramImpl histogram; + HistogramImpl other; + MergeHistogram(histogram, other); + + HistogramWindowingImpl histogramWindowing; + HistogramWindowingImpl otherWindowing; + MergeHistogram(histogramWindowing, otherWindowing); +} + +TEST_F(HistogramTest, EmptyHistogram) { + HistogramImpl histogram; + EmptyHistogram(histogram); + + HistogramWindowingImpl histogramWindowing; + EmptyHistogram(histogramWindowing); +} + +TEST_F(HistogramTest, ClearHistogram) { + HistogramImpl histogram; + ClearHistogram(histogram); + + HistogramWindowingImpl histogramWindowing; + ClearHistogram(histogramWindowing); +} + +TEST_F(HistogramTest, HistogramWindowingExpire) { + uint64_t num_windows = 3; + int micros_per_window = 1000000; + uint64_t min_num_per_window = 0; + + HistogramWindowingImpl + histogramWindowing(num_windows, micros_per_window, min_num_per_window); + + PopulateHistogram(histogramWindowing, 1, 1, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 100); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 1); + ASSERT_EQ(histogramWindowing.Average(), 1); + + PopulateHistogram(histogramWindowing, 2, 2, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 200); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 2); + ASSERT_EQ(histogramWindowing.Average(), 1.5); + + PopulateHistogram(histogramWindowing, 3, 3, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 300); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 3); + ASSERT_EQ(histogramWindowing.Average(), 2.0); + + // dropping oldest window with value 1, remaining 2 ~ 4 + PopulateHistogram(histogramWindowing, 4, 4, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 300); + ASSERT_EQ(histogramWindowing.min(), 2); + ASSERT_EQ(histogramWindowing.max(), 4); + ASSERT_EQ(histogramWindowing.Average(), 3.0); + + // dropping oldest window with value 2, remaining 3 ~ 5 + PopulateHistogram(histogramWindowing, 5, 5, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 300); + ASSERT_EQ(histogramWindowing.min(), 3); + ASSERT_EQ(histogramWindowing.max(), 5); + ASSERT_EQ(histogramWindowing.Average(), 4.0); +} + +TEST_F(HistogramTest, HistogramWindowingMerge) { + uint64_t num_windows = 3; + int micros_per_window = 1000000; + uint64_t min_num_per_window = 0; + + HistogramWindowingImpl + histogramWindowing(num_windows, micros_per_window, min_num_per_window); + HistogramWindowingImpl + otherWindowing(num_windows, micros_per_window, min_num_per_window); + + PopulateHistogram(histogramWindowing, 1, 1, 100); + PopulateHistogram(otherWindowing, 1, 1, 100); + env->SleepForMicroseconds(micros_per_window); + + PopulateHistogram(histogramWindowing, 2, 2, 100); + PopulateHistogram(otherWindowing, 2, 2, 100); + env->SleepForMicroseconds(micros_per_window); + + PopulateHistogram(histogramWindowing, 3, 3, 100); + PopulateHistogram(otherWindowing, 3, 3, 100); + env->SleepForMicroseconds(micros_per_window); + + histogramWindowing.Merge(otherWindowing); + ASSERT_EQ(histogramWindowing.num(), 600); + ASSERT_EQ(histogramWindowing.min(), 1); + ASSERT_EQ(histogramWindowing.max(), 3); + ASSERT_EQ(histogramWindowing.Average(), 2.0); + + // dropping oldest window with value 1, remaining 2 ~ 4 + PopulateHistogram(histogramWindowing, 4, 4, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 500); + ASSERT_EQ(histogramWindowing.min(), 2); + ASSERT_EQ(histogramWindowing.max(), 4); + + // dropping oldest window with value 2, remaining 3 ~ 5 + PopulateHistogram(histogramWindowing, 5, 5, 100); + env->SleepForMicroseconds(micros_per_window); + ASSERT_EQ(histogramWindowing.num(), 400); + ASSERT_EQ(histogramWindowing.min(), 3); + ASSERT_EQ(histogramWindowing.max(), 5); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/monitoring/histogram_windowing.cc b/src/rocksdb/monitoring/histogram_windowing.cc new file mode 100644 index 000000000..253512778 --- /dev/null +++ b/src/rocksdb/monitoring/histogram_windowing.cc @@ -0,0 +1,202 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "monitoring/histogram_windowing.h" +#include "monitoring/histogram.h" +#include "util/cast_util.h" + +#include <algorithm> + +namespace ROCKSDB_NAMESPACE { + +HistogramWindowingImpl::HistogramWindowingImpl() { + env_ = Env::Default(); + window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]); + Clear(); +} + +HistogramWindowingImpl::HistogramWindowingImpl( + uint64_t num_windows, + uint64_t micros_per_window, + uint64_t min_num_per_window) : + num_windows_(num_windows), + micros_per_window_(micros_per_window), + min_num_per_window_(min_num_per_window) { + env_ = Env::Default(); + window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]); + Clear(); +} + +HistogramWindowingImpl::~HistogramWindowingImpl() { +} + +void HistogramWindowingImpl::Clear() { + std::lock_guard<std::mutex> lock(mutex_); + + stats_.Clear(); + for (size_t i = 0; i < num_windows_; i++) { + window_stats_[i].Clear(); + } + current_window_.store(0, std::memory_order_relaxed); + last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed); +} + +bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); } + +// This function is designed to be lock free, as it's in the critical path +// of any operation. +// Each individual value is atomic, it is just that some samples can go +// in the older bucket which is tolerable. +void HistogramWindowingImpl::Add(uint64_t value){ + TimerTick(); + + // Parent (global) member update + stats_.Add(value); + + // Current window update + window_stats_[static_cast<size_t>(current_window())].Add(value); +} + +void HistogramWindowingImpl::Merge(const Histogram& other) { + if (strcmp(Name(), other.Name()) == 0) { + Merge( + *static_cast_with_check<const HistogramWindowingImpl, const Histogram>( + &other)); + } +} + +void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) { + std::lock_guard<std::mutex> lock(mutex_); + stats_.Merge(other.stats_); + + if (stats_.num_buckets_ != other.stats_.num_buckets_ || + micros_per_window_ != other.micros_per_window_) { + return; + } + + uint64_t cur_window = current_window(); + uint64_t other_cur_window = other.current_window(); + // going backwards for alignment + for (unsigned int i = 0; + i < std::min(num_windows_, other.num_windows_); i++) { + uint64_t window_index = + (cur_window + num_windows_ - i) % num_windows_; + uint64_t other_window_index = + (other_cur_window + other.num_windows_ - i) % other.num_windows_; + size_t windex = static_cast<size_t>(window_index); + size_t other_windex = static_cast<size_t>(other_window_index); + + window_stats_[windex].Merge( + other.window_stats_[other_windex]); + } +} + +std::string HistogramWindowingImpl::ToString() const { + return stats_.ToString(); +} + +double HistogramWindowingImpl::Median() const { + return Percentile(50.0); +} + +double HistogramWindowingImpl::Percentile(double p) const { + // Retry 3 times in total + for (int retry = 0; retry < 3; retry++) { + uint64_t start_num = stats_.num(); + double result = stats_.Percentile(p); + // Detect if swap buckets or Clear() was called during calculation + if (stats_.num() >= start_num) { + return result; + } + } + return 0.0; +} + +double HistogramWindowingImpl::Average() const { + return stats_.Average(); +} + +double HistogramWindowingImpl::StandardDeviation() const { + return stats_.StandardDeviation(); +} + +void HistogramWindowingImpl::Data(HistogramData * const data) const { + stats_.Data(data); +} + +void HistogramWindowingImpl::TimerTick() { + uint64_t curr_time = env_->NowMicros(); + size_t curr_window_ = static_cast<size_t>(current_window()); + if (curr_time - last_swap_time() > micros_per_window_ && + window_stats_[curr_window_].num() >= min_num_per_window_) { + SwapHistoryBucket(); + } +} + +void HistogramWindowingImpl::SwapHistoryBucket() { + // Threads executing Add() would be competing for this mutex, the first one + // who got the metex would take care of the bucket swap, other threads + // can skip this. + // If mutex is held by Merge() or Clear(), next Add() will take care of the + // swap, if needed. + if (mutex_.try_lock()) { + last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed); + + uint64_t curr_window = current_window(); + uint64_t next_window = (curr_window == num_windows_ - 1) ? + 0 : curr_window + 1; + + // subtract next buckets from totals and swap to next buckets + HistogramStat& stats_to_drop = + window_stats_[static_cast<size_t>(next_window)]; + + if (!stats_to_drop.Empty()) { + for (size_t b = 0; b < stats_.num_buckets_; b++){ + stats_.buckets_[b].fetch_sub( + stats_to_drop.bucket_at(b), std::memory_order_relaxed); + } + + if (stats_.min() == stats_to_drop.min()) { + uint64_t new_min = std::numeric_limits<uint64_t>::max(); + for (unsigned int i = 0; i < num_windows_; i++) { + if (i != next_window) { + uint64_t m = window_stats_[i].min(); + if (m < new_min) new_min = m; + } + } + stats_.min_.store(new_min, std::memory_order_relaxed); + } + + if (stats_.max() == stats_to_drop.max()) { + uint64_t new_max = 0; + for (unsigned int i = 0; i < num_windows_; i++) { + if (i != next_window) { + uint64_t m = window_stats_[i].max(); + if (m > new_max) new_max = m; + } + } + stats_.max_.store(new_max, std::memory_order_relaxed); + } + + stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed); + stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed); + stats_.sum_squares_.fetch_sub( + stats_to_drop.sum_squares(), std::memory_order_relaxed); + + stats_to_drop.Clear(); + } + + // advance to next window bucket + current_window_.store(next_window, std::memory_order_relaxed); + + mutex_.unlock(); + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/histogram_windowing.h b/src/rocksdb/monitoring/histogram_windowing.h new file mode 100644 index 000000000..72545b07f --- /dev/null +++ b/src/rocksdb/monitoring/histogram_windowing.h @@ -0,0 +1,80 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "monitoring/histogram.h" +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +class HistogramWindowingImpl : public Histogram +{ +public: + HistogramWindowingImpl(); + HistogramWindowingImpl(uint64_t num_windows, + uint64_t micros_per_window, + uint64_t min_num_per_window); + + HistogramWindowingImpl(const HistogramWindowingImpl&) = delete; + HistogramWindowingImpl& operator=(const HistogramWindowingImpl&) = delete; + + ~HistogramWindowingImpl(); + + virtual void Clear() override; + virtual bool Empty() const override; + virtual void Add(uint64_t value) override; + virtual void Merge(const Histogram& other) override; + void Merge(const HistogramWindowingImpl& other); + + virtual std::string ToString() const override; + virtual const char* Name() const override { return "HistogramWindowingImpl"; } + virtual uint64_t min() const override { return stats_.min(); } + virtual uint64_t max() const override { return stats_.max(); } + virtual uint64_t num() const override { return stats_.num(); } + virtual double Median() const override; + virtual double Percentile(double p) const override; + virtual double Average() const override; + virtual double StandardDeviation() const override; + virtual void Data(HistogramData* const data) const override; + +private: + void TimerTick(); + void SwapHistoryBucket(); + inline uint64_t current_window() const { + return current_window_.load(std::memory_order_relaxed); + } + inline uint64_t last_swap_time() const{ + return last_swap_time_.load(std::memory_order_relaxed); + } + + Env* env_; + std::mutex mutex_; + + // Aggregated stats over windows_stats_, all the computation is done + // upon aggregated values + HistogramStat stats_; + + // This is a circular array representing the latest N time-windows. + // Each entry stores a time-window of data. Expiration is done + // on window-based. + std::unique_ptr<HistogramStat[]> window_stats_; + + std::atomic_uint_fast64_t current_window_; + std::atomic_uint_fast64_t last_swap_time_; + + // Following parameters are configuable + uint64_t num_windows_ = 5; + uint64_t micros_per_window_ = 60000000; + // By default, don't care about the number of values in current window + // when decide whether to swap windows or not. + uint64_t min_num_per_window_ = 0; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/in_memory_stats_history.cc b/src/rocksdb/monitoring/in_memory_stats_history.cc new file mode 100644 index 000000000..dba791e2b --- /dev/null +++ b/src/rocksdb/monitoring/in_memory_stats_history.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "monitoring/in_memory_stats_history.h" +#include "db/db_impl/db_impl.h" + +namespace ROCKSDB_NAMESPACE { + +InMemoryStatsHistoryIterator::~InMemoryStatsHistoryIterator() {} + +bool InMemoryStatsHistoryIterator::Valid() const { return valid_; } + +Status InMemoryStatsHistoryIterator::status() const { return status_; } + +// Because of garbage collection, the next stats snapshot may or may not be +// right after the current one. When reading from DBImpl::stats_history_, this +// call will be protected by DB Mutex so it will not return partial or +// corrupted results. +void InMemoryStatsHistoryIterator::Next() { + // increment start_time by 1 to avoid infinite loop + AdvanceIteratorByTime(GetStatsTime() + 1, end_time_); +} + +uint64_t InMemoryStatsHistoryIterator::GetStatsTime() const { return time_; } + +const std::map<std::string, uint64_t>& +InMemoryStatsHistoryIterator::GetStatsMap() const { + return stats_map_; +} + +// advance the iterator to the next time between [start_time, end_time) +// if success, update time_ and stats_map_ with new_time and stats_map +void InMemoryStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time, + uint64_t end_time) { + // try to find next entry in stats_history_ map + if (db_impl_ != nullptr) { + valid_ = + db_impl_->FindStatsByTime(start_time, end_time, &time_, &stats_map_); + } else { + valid_ = false; + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/in_memory_stats_history.h b/src/rocksdb/monitoring/in_memory_stats_history.h new file mode 100644 index 000000000..3be864fe2 --- /dev/null +++ b/src/rocksdb/monitoring/in_memory_stats_history.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/stats_history.h" + +namespace ROCKSDB_NAMESPACE { + +// InMemoryStatsHistoryIterator can be used to access stats history that was +// stored by an in-memory two level std::map(DBImpl::stats_history_). It keeps +// a copy of the stats snapshot (in stats_map_) that is currently being pointed +// to, which allows the iterator to access the stats snapshot even when +// the background garbage collecting thread purges it from the source of truth +// (`DBImpl::stats_history_`). In that case, the iterator will continue to be +// valid until a call to `Next()` returns no result and invalidates it. In +// some extreme cases, the iterator may also return fragmented segments of +// stats snapshots due to long gaps between `Next()` calls and interleaved +// garbage collection. +class InMemoryStatsHistoryIterator final : public StatsHistoryIterator { + public: + // Setup InMemoryStatsHistoryIterator to return stats snapshots between + // seconds timestamps [start_time, end_time) + InMemoryStatsHistoryIterator(uint64_t start_time, uint64_t end_time, + DBImpl* db_impl) + : start_time_(start_time), + end_time_(end_time), + valid_(true), + db_impl_(db_impl) { + AdvanceIteratorByTime(start_time_, end_time_); + } + // no copying allowed + InMemoryStatsHistoryIterator(const InMemoryStatsHistoryIterator&) = delete; + void operator=(const InMemoryStatsHistoryIterator&) = delete; + InMemoryStatsHistoryIterator(InMemoryStatsHistoryIterator&&) = delete; + InMemoryStatsHistoryIterator& operator=(InMemoryStatsHistoryIterator&&) = + delete; + + ~InMemoryStatsHistoryIterator() override; + bool Valid() const override; + Status status() const override; + + // Move to the next stats snapshot currently available + // This function may invalidate the iterator + // REQUIRES: Valid() + void Next() override; + + // REQUIRES: Valid() + uint64_t GetStatsTime() const override; + + // This function is idempotent + // REQUIRES: Valid() + const std::map<std::string, uint64_t>& GetStatsMap() const override; + + private: + // advance the iterator to the next stats history record with timestamp + // between [start_time, end_time) + void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time); + + uint64_t time_; + uint64_t start_time_; + uint64_t end_time_; + std::map<std::string, uint64_t> stats_map_; + Status status_; + bool valid_; + DBImpl* db_impl_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/instrumented_mutex.cc b/src/rocksdb/monitoring/instrumented_mutex.cc new file mode 100644 index 000000000..8b06c60ff --- /dev/null +++ b/src/rocksdb/monitoring/instrumented_mutex.cc @@ -0,0 +1,69 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "monitoring/instrumented_mutex.h" +#include "monitoring/perf_context_imp.h" +#include "monitoring/thread_status_util.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { +namespace { +Statistics* stats_for_report(Env* env, Statistics* stats) { + if (env != nullptr && stats != nullptr && + stats->get_stats_level() > kExceptTimeForMutex) { + return stats; + } else { + return nullptr; + } +} +} // namespace + +void InstrumentedMutex::Lock() { + PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD( + db_mutex_lock_nanos, stats_code_ == DB_MUTEX_WAIT_MICROS, + stats_for_report(env_, stats_), stats_code_); + LockInternal(); +} + +void InstrumentedMutex::LockInternal() { +#ifndef NDEBUG + ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT); +#endif + mutex_.Lock(); +} + +void InstrumentedCondVar::Wait() { + PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD( + db_condition_wait_nanos, stats_code_ == DB_MUTEX_WAIT_MICROS, + stats_for_report(env_, stats_), stats_code_); + WaitInternal(); +} + +void InstrumentedCondVar::WaitInternal() { +#ifndef NDEBUG + ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT); +#endif + cond_.Wait(); +} + +bool InstrumentedCondVar::TimedWait(uint64_t abs_time_us) { + PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD( + db_condition_wait_nanos, stats_code_ == DB_MUTEX_WAIT_MICROS, + stats_for_report(env_, stats_), stats_code_); + return TimedWaitInternal(abs_time_us); +} + +bool InstrumentedCondVar::TimedWaitInternal(uint64_t abs_time_us) { +#ifndef NDEBUG + ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT); +#endif + + TEST_SYNC_POINT_CALLBACK("InstrumentedCondVar::TimedWaitInternal", + &abs_time_us); + + return cond_.TimedWait(abs_time_us); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/instrumented_mutex.h b/src/rocksdb/monitoring/instrumented_mutex.h new file mode 100644 index 000000000..50c1f29c8 --- /dev/null +++ b/src/rocksdb/monitoring/instrumented_mutex.h @@ -0,0 +1,98 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "monitoring/statistics.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/statistics.h" +#include "rocksdb/thread_status.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { +class InstrumentedCondVar; + +// A wrapper class for port::Mutex that provides additional layer +// for collecting stats and instrumentation. +class InstrumentedMutex { + public: + explicit InstrumentedMutex(bool adaptive = false) + : mutex_(adaptive), stats_(nullptr), env_(nullptr), + stats_code_(0) {} + + InstrumentedMutex( + Statistics* stats, Env* env, + int stats_code, bool adaptive = false) + : mutex_(adaptive), stats_(stats), env_(env), + stats_code_(stats_code) {} + + void Lock(); + + void Unlock() { + mutex_.Unlock(); + } + + void AssertHeld() { + mutex_.AssertHeld(); + } + + private: + void LockInternal(); + friend class InstrumentedCondVar; + port::Mutex mutex_; + Statistics* stats_; + Env* env_; + int stats_code_; +}; + +// A wrapper class for port::Mutex that provides additional layer +// for collecting stats and instrumentation. +class InstrumentedMutexLock { + public: + explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) { + mutex_->Lock(); + } + + ~InstrumentedMutexLock() { + mutex_->Unlock(); + } + + private: + InstrumentedMutex* const mutex_; + InstrumentedMutexLock(const InstrumentedMutexLock&) = delete; + void operator=(const InstrumentedMutexLock&) = delete; +}; + +class InstrumentedCondVar { + public: + explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex) + : cond_(&(instrumented_mutex->mutex_)), + stats_(instrumented_mutex->stats_), + env_(instrumented_mutex->env_), + stats_code_(instrumented_mutex->stats_code_) {} + + void Wait(); + + bool TimedWait(uint64_t abs_time_us); + + void Signal() { + cond_.Signal(); + } + + void SignalAll() { + cond_.SignalAll(); + } + + private: + void WaitInternal(); + bool TimedWaitInternal(uint64_t abs_time_us); + port::CondVar cond_; + Statistics* stats_; + Env* env_; + int stats_code_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/iostats_context.cc b/src/rocksdb/monitoring/iostats_context.cc new file mode 100644 index 000000000..2960f05e8 --- /dev/null +++ b/src/rocksdb/monitoring/iostats_context.cc @@ -0,0 +1,62 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <sstream> +#include "monitoring/iostats_context_imp.h" +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL +__thread IOStatsContext iostats_context; +#endif + +IOStatsContext* get_iostats_context() { +#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL + return &iostats_context; +#else + return nullptr; +#endif +} + +void IOStatsContext::Reset() { + thread_pool_id = Env::Priority::TOTAL; + bytes_read = 0; + bytes_written = 0; + open_nanos = 0; + allocate_nanos = 0; + write_nanos = 0; + read_nanos = 0; + range_sync_nanos = 0; + prepare_write_nanos = 0; + fsync_nanos = 0; + logger_nanos = 0; +} + +#define IOSTATS_CONTEXT_OUTPUT(counter) \ + if (!exclude_zero_counters || counter > 0) { \ + ss << #counter << " = " << counter << ", "; \ + } + +std::string IOStatsContext::ToString(bool exclude_zero_counters) const { + std::ostringstream ss; + IOSTATS_CONTEXT_OUTPUT(thread_pool_id); + IOSTATS_CONTEXT_OUTPUT(bytes_read); + IOSTATS_CONTEXT_OUTPUT(bytes_written); + IOSTATS_CONTEXT_OUTPUT(open_nanos); + IOSTATS_CONTEXT_OUTPUT(allocate_nanos); + IOSTATS_CONTEXT_OUTPUT(write_nanos); + IOSTATS_CONTEXT_OUTPUT(read_nanos); + IOSTATS_CONTEXT_OUTPUT(range_sync_nanos); + IOSTATS_CONTEXT_OUTPUT(fsync_nanos); + IOSTATS_CONTEXT_OUTPUT(prepare_write_nanos); + IOSTATS_CONTEXT_OUTPUT(logger_nanos); + + std::string str = ss.str(); + str.erase(str.find_last_not_of(", ") + 1); + return str; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/iostats_context_imp.h b/src/rocksdb/monitoring/iostats_context_imp.h new file mode 100644 index 000000000..a7f095d6e --- /dev/null +++ b/src/rocksdb/monitoring/iostats_context_imp.h @@ -0,0 +1,60 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "monitoring/perf_step_timer.h" +#include "rocksdb/iostats_context.h" + +#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL +namespace ROCKSDB_NAMESPACE { +extern __thread IOStatsContext iostats_context; +} // namespace ROCKSDB_NAMESPACE + +// increment a specific counter by the specified value +#define IOSTATS_ADD(metric, value) (iostats_context.metric += value) + +// Increase metric value only when it is positive +#define IOSTATS_ADD_IF_POSITIVE(metric, value) \ + if (value > 0) { IOSTATS_ADD(metric, value); } + +// reset a specific counter to zero +#define IOSTATS_RESET(metric) (iostats_context.metric = 0) + +// reset all counters to zero +#define IOSTATS_RESET_ALL() (iostats_context.Reset()) + +#define IOSTATS_SET_THREAD_POOL_ID(value) \ + (iostats_context.thread_pool_id = value) + +#define IOSTATS_THREAD_POOL_ID() (iostats_context.thread_pool_id) + +#define IOSTATS(metric) (iostats_context.metric) + +// Declare and set start time of the timer +#define IOSTATS_TIMER_GUARD(metric) \ + PerfStepTimer iostats_step_timer_##metric(&(iostats_context.metric)); \ + iostats_step_timer_##metric.Start(); + +// Declare and set start time of the timer +#define IOSTATS_CPU_TIMER_GUARD(metric, env) \ + PerfStepTimer iostats_step_timer_##metric( \ + &(iostats_context.metric), env, true, \ + PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \ + iostats_step_timer_##metric.Start(); + +#else // ROCKSDB_SUPPORT_THREAD_LOCAL + +#define IOSTATS_ADD(metric, value) +#define IOSTATS_ADD_IF_POSITIVE(metric, value) +#define IOSTATS_RESET(metric) +#define IOSTATS_RESET_ALL() +#define IOSTATS_SET_THREAD_POOL_ID(value) +#define IOSTATS_THREAD_POOL_ID() +#define IOSTATS(metric) 0 + +#define IOSTATS_TIMER_GUARD(metric) +#define IOSTATS_CPU_TIMER_GUARD(metric, env) static_cast<void>(env) + +#endif // ROCKSDB_SUPPORT_THREAD_LOCAL diff --git a/src/rocksdb/monitoring/iostats_context_test.cc b/src/rocksdb/monitoring/iostats_context_test.cc new file mode 100644 index 000000000..49f6fc058 --- /dev/null +++ b/src/rocksdb/monitoring/iostats_context_test.cc @@ -0,0 +1,29 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/iostats_context.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(IOStatsContextTest, ToString) { + get_iostats_context()->Reset(); + get_iostats_context()->bytes_read = 12345; + + std::string zero_included = get_iostats_context()->ToString(); + ASSERT_NE(std::string::npos, zero_included.find("= 0")); + ASSERT_NE(std::string::npos, zero_included.find("= 12345")); + + std::string zero_excluded = get_iostats_context()->ToString(true); + ASSERT_EQ(std::string::npos, zero_excluded.find("= 0")); + ASSERT_NE(std::string::npos, zero_excluded.find("= 12345")); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/monitoring/perf_context.cc b/src/rocksdb/monitoring/perf_context.cc new file mode 100644 index 000000000..446a9b766 --- /dev/null +++ b/src/rocksdb/monitoring/perf_context.cc @@ -0,0 +1,559 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include <sstream> +#include "monitoring/perf_context_imp.h" + +namespace ROCKSDB_NAMESPACE { + +#if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL) +PerfContext perf_context; +#else +#if defined(OS_SOLARIS) +__thread PerfContext perf_context_; +#else +thread_local PerfContext perf_context; +#endif +#endif + +PerfContext* get_perf_context() { +#if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL) + return &perf_context; +#else +#if defined(OS_SOLARIS) + return &perf_context_; +#else + return &perf_context; +#endif +#endif +} + +PerfContext::~PerfContext() { +#if !defined(NPERF_CONTEXT) && defined(ROCKSDB_SUPPORT_THREAD_LOCAL) && !defined(OS_SOLARIS) + ClearPerLevelPerfContext(); +#endif +} + +PerfContext::PerfContext(const PerfContext& other) { +#ifndef NPERF_CONTEXT + user_key_comparison_count = other.user_key_comparison_count; + block_cache_hit_count = other.block_cache_hit_count; + block_read_count = other.block_read_count; + block_read_byte = other.block_read_byte; + block_read_time = other.block_read_time; + block_cache_index_hit_count = other.block_cache_index_hit_count; + index_block_read_count = other.index_block_read_count; + block_cache_filter_hit_count = other.block_cache_filter_hit_count; + filter_block_read_count = other.filter_block_read_count; + compression_dict_block_read_count = other.compression_dict_block_read_count; + block_checksum_time = other.block_checksum_time; + block_decompress_time = other.block_decompress_time; + get_read_bytes = other.get_read_bytes; + multiget_read_bytes = other.multiget_read_bytes; + iter_read_bytes = other.iter_read_bytes; + internal_key_skipped_count = other.internal_key_skipped_count; + internal_delete_skipped_count = other.internal_delete_skipped_count; + internal_recent_skipped_count = other.internal_recent_skipped_count; + internal_merge_count = other.internal_merge_count; + write_wal_time = other.write_wal_time; + get_snapshot_time = other.get_snapshot_time; + get_from_memtable_time = other.get_from_memtable_time; + get_from_memtable_count = other.get_from_memtable_count; + get_post_process_time = other.get_post_process_time; + get_from_output_files_time = other.get_from_output_files_time; + seek_on_memtable_time = other.seek_on_memtable_time; + seek_on_memtable_count = other.seek_on_memtable_count; + next_on_memtable_count = other.next_on_memtable_count; + prev_on_memtable_count = other.prev_on_memtable_count; + seek_child_seek_time = other.seek_child_seek_time; + seek_child_seek_count = other.seek_child_seek_count; + seek_min_heap_time = other.seek_min_heap_time; + seek_internal_seek_time = other.seek_internal_seek_time; + find_next_user_entry_time = other.find_next_user_entry_time; + write_pre_and_post_process_time = other.write_pre_and_post_process_time; + write_memtable_time = other.write_memtable_time; + write_delay_time = other.write_delay_time; + write_thread_wait_nanos = other.write_thread_wait_nanos; + write_scheduling_flushes_compactions_time = + other.write_scheduling_flushes_compactions_time; + db_mutex_lock_nanos = other.db_mutex_lock_nanos; + db_condition_wait_nanos = other.db_condition_wait_nanos; + merge_operator_time_nanos = other.merge_operator_time_nanos; + read_index_block_nanos = other.read_index_block_nanos; + read_filter_block_nanos = other.read_filter_block_nanos; + new_table_block_iter_nanos = other.new_table_block_iter_nanos; + new_table_iterator_nanos = other.new_table_iterator_nanos; + block_seek_nanos = other.block_seek_nanos; + find_table_nanos = other.find_table_nanos; + bloom_memtable_hit_count = other.bloom_memtable_hit_count; + bloom_memtable_miss_count = other.bloom_memtable_miss_count; + bloom_sst_hit_count = other.bloom_sst_hit_count; + bloom_sst_miss_count = other.bloom_sst_miss_count; + key_lock_wait_time = other.key_lock_wait_time; + key_lock_wait_count = other.key_lock_wait_count; + + env_new_sequential_file_nanos = other.env_new_sequential_file_nanos; + env_new_random_access_file_nanos = other.env_new_random_access_file_nanos; + env_new_writable_file_nanos = other.env_new_writable_file_nanos; + env_reuse_writable_file_nanos = other.env_reuse_writable_file_nanos; + env_new_random_rw_file_nanos = other.env_new_random_rw_file_nanos; + env_new_directory_nanos = other.env_new_directory_nanos; + env_file_exists_nanos = other.env_file_exists_nanos; + env_get_children_nanos = other.env_get_children_nanos; + env_get_children_file_attributes_nanos = + other.env_get_children_file_attributes_nanos; + env_delete_file_nanos = other.env_delete_file_nanos; + env_create_dir_nanos = other.env_create_dir_nanos; + env_create_dir_if_missing_nanos = other.env_create_dir_if_missing_nanos; + env_delete_dir_nanos = other.env_delete_dir_nanos; + env_get_file_size_nanos = other.env_get_file_size_nanos; + env_get_file_modification_time_nanos = + other.env_get_file_modification_time_nanos; + env_rename_file_nanos = other.env_rename_file_nanos; + env_link_file_nanos = other.env_link_file_nanos; + env_lock_file_nanos = other.env_lock_file_nanos; + env_unlock_file_nanos = other.env_unlock_file_nanos; + env_new_logger_nanos = other.env_new_logger_nanos; + get_cpu_nanos = other.get_cpu_nanos; + iter_next_cpu_nanos = other.iter_next_cpu_nanos; + iter_prev_cpu_nanos = other.iter_prev_cpu_nanos; + iter_seek_cpu_nanos = other.iter_seek_cpu_nanos; + if (per_level_perf_context_enabled && level_to_perf_context != nullptr) { + ClearPerLevelPerfContext(); + } + if (other.level_to_perf_context != nullptr) { + level_to_perf_context = new std::map<uint32_t, PerfContextByLevel>(); + *level_to_perf_context = *other.level_to_perf_context; + } + per_level_perf_context_enabled = other.per_level_perf_context_enabled; +#endif +} + +PerfContext::PerfContext(PerfContext&& other) noexcept { +#ifndef NPERF_CONTEXT + user_key_comparison_count = other.user_key_comparison_count; + block_cache_hit_count = other.block_cache_hit_count; + block_read_count = other.block_read_count; + block_read_byte = other.block_read_byte; + block_read_time = other.block_read_time; + block_cache_index_hit_count = other.block_cache_index_hit_count; + index_block_read_count = other.index_block_read_count; + block_cache_filter_hit_count = other.block_cache_filter_hit_count; + filter_block_read_count = other.filter_block_read_count; + compression_dict_block_read_count = other.compression_dict_block_read_count; + block_checksum_time = other.block_checksum_time; + block_decompress_time = other.block_decompress_time; + get_read_bytes = other.get_read_bytes; + multiget_read_bytes = other.multiget_read_bytes; + iter_read_bytes = other.iter_read_bytes; + internal_key_skipped_count = other.internal_key_skipped_count; + internal_delete_skipped_count = other.internal_delete_skipped_count; + internal_recent_skipped_count = other.internal_recent_skipped_count; + internal_merge_count = other.internal_merge_count; + write_wal_time = other.write_wal_time; + get_snapshot_time = other.get_snapshot_time; + get_from_memtable_time = other.get_from_memtable_time; + get_from_memtable_count = other.get_from_memtable_count; + get_post_process_time = other.get_post_process_time; + get_from_output_files_time = other.get_from_output_files_time; + seek_on_memtable_time = other.seek_on_memtable_time; + seek_on_memtable_count = other.seek_on_memtable_count; + next_on_memtable_count = other.next_on_memtable_count; + prev_on_memtable_count = other.prev_on_memtable_count; + seek_child_seek_time = other.seek_child_seek_time; + seek_child_seek_count = other.seek_child_seek_count; + seek_min_heap_time = other.seek_min_heap_time; + seek_internal_seek_time = other.seek_internal_seek_time; + find_next_user_entry_time = other.find_next_user_entry_time; + write_pre_and_post_process_time = other.write_pre_and_post_process_time; + write_memtable_time = other.write_memtable_time; + write_delay_time = other.write_delay_time; + write_thread_wait_nanos = other.write_thread_wait_nanos; + write_scheduling_flushes_compactions_time = + other.write_scheduling_flushes_compactions_time; + db_mutex_lock_nanos = other.db_mutex_lock_nanos; + db_condition_wait_nanos = other.db_condition_wait_nanos; + merge_operator_time_nanos = other.merge_operator_time_nanos; + read_index_block_nanos = other.read_index_block_nanos; + read_filter_block_nanos = other.read_filter_block_nanos; + new_table_block_iter_nanos = other.new_table_block_iter_nanos; + new_table_iterator_nanos = other.new_table_iterator_nanos; + block_seek_nanos = other.block_seek_nanos; + find_table_nanos = other.find_table_nanos; + bloom_memtable_hit_count = other.bloom_memtable_hit_count; + bloom_memtable_miss_count = other.bloom_memtable_miss_count; + bloom_sst_hit_count = other.bloom_sst_hit_count; + bloom_sst_miss_count = other.bloom_sst_miss_count; + key_lock_wait_time = other.key_lock_wait_time; + key_lock_wait_count = other.key_lock_wait_count; + + env_new_sequential_file_nanos = other.env_new_sequential_file_nanos; + env_new_random_access_file_nanos = other.env_new_random_access_file_nanos; + env_new_writable_file_nanos = other.env_new_writable_file_nanos; + env_reuse_writable_file_nanos = other.env_reuse_writable_file_nanos; + env_new_random_rw_file_nanos = other.env_new_random_rw_file_nanos; + env_new_directory_nanos = other.env_new_directory_nanos; + env_file_exists_nanos = other.env_file_exists_nanos; + env_get_children_nanos = other.env_get_children_nanos; + env_get_children_file_attributes_nanos = + other.env_get_children_file_attributes_nanos; + env_delete_file_nanos = other.env_delete_file_nanos; + env_create_dir_nanos = other.env_create_dir_nanos; + env_create_dir_if_missing_nanos = other.env_create_dir_if_missing_nanos; + env_delete_dir_nanos = other.env_delete_dir_nanos; + env_get_file_size_nanos = other.env_get_file_size_nanos; + env_get_file_modification_time_nanos = + other.env_get_file_modification_time_nanos; + env_rename_file_nanos = other.env_rename_file_nanos; + env_link_file_nanos = other.env_link_file_nanos; + env_lock_file_nanos = other.env_lock_file_nanos; + env_unlock_file_nanos = other.env_unlock_file_nanos; + env_new_logger_nanos = other.env_new_logger_nanos; + get_cpu_nanos = other.get_cpu_nanos; + iter_next_cpu_nanos = other.iter_next_cpu_nanos; + iter_prev_cpu_nanos = other.iter_prev_cpu_nanos; + iter_seek_cpu_nanos = other.iter_seek_cpu_nanos; + if (per_level_perf_context_enabled && level_to_perf_context != nullptr) { + ClearPerLevelPerfContext(); + } + if (other.level_to_perf_context != nullptr) { + level_to_perf_context = other.level_to_perf_context; + other.level_to_perf_context = nullptr; + } + per_level_perf_context_enabled = other.per_level_perf_context_enabled; +#endif +} + +// TODO(Zhongyi): reduce code duplication between copy constructor and +// assignment operator +PerfContext& PerfContext::operator=(const PerfContext& other) { +#ifndef NPERF_CONTEXT + user_key_comparison_count = other.user_key_comparison_count; + block_cache_hit_count = other.block_cache_hit_count; + block_read_count = other.block_read_count; + block_read_byte = other.block_read_byte; + block_read_time = other.block_read_time; + block_cache_index_hit_count = other.block_cache_index_hit_count; + index_block_read_count = other.index_block_read_count; + block_cache_filter_hit_count = other.block_cache_filter_hit_count; + filter_block_read_count = other.filter_block_read_count; + compression_dict_block_read_count = other.compression_dict_block_read_count; + block_checksum_time = other.block_checksum_time; + block_decompress_time = other.block_decompress_time; + get_read_bytes = other.get_read_bytes; + multiget_read_bytes = other.multiget_read_bytes; + iter_read_bytes = other.iter_read_bytes; + internal_key_skipped_count = other.internal_key_skipped_count; + internal_delete_skipped_count = other.internal_delete_skipped_count; + internal_recent_skipped_count = other.internal_recent_skipped_count; + internal_merge_count = other.internal_merge_count; + write_wal_time = other.write_wal_time; + get_snapshot_time = other.get_snapshot_time; + get_from_memtable_time = other.get_from_memtable_time; + get_from_memtable_count = other.get_from_memtable_count; + get_post_process_time = other.get_post_process_time; + get_from_output_files_time = other.get_from_output_files_time; + seek_on_memtable_time = other.seek_on_memtable_time; + seek_on_memtable_count = other.seek_on_memtable_count; + next_on_memtable_count = other.next_on_memtable_count; + prev_on_memtable_count = other.prev_on_memtable_count; + seek_child_seek_time = other.seek_child_seek_time; + seek_child_seek_count = other.seek_child_seek_count; + seek_min_heap_time = other.seek_min_heap_time; + seek_internal_seek_time = other.seek_internal_seek_time; + find_next_user_entry_time = other.find_next_user_entry_time; + write_pre_and_post_process_time = other.write_pre_and_post_process_time; + write_memtable_time = other.write_memtable_time; + write_delay_time = other.write_delay_time; + write_thread_wait_nanos = other.write_thread_wait_nanos; + write_scheduling_flushes_compactions_time = + other.write_scheduling_flushes_compactions_time; + db_mutex_lock_nanos = other.db_mutex_lock_nanos; + db_condition_wait_nanos = other.db_condition_wait_nanos; + merge_operator_time_nanos = other.merge_operator_time_nanos; + read_index_block_nanos = other.read_index_block_nanos; + read_filter_block_nanos = other.read_filter_block_nanos; + new_table_block_iter_nanos = other.new_table_block_iter_nanos; + new_table_iterator_nanos = other.new_table_iterator_nanos; + block_seek_nanos = other.block_seek_nanos; + find_table_nanos = other.find_table_nanos; + bloom_memtable_hit_count = other.bloom_memtable_hit_count; + bloom_memtable_miss_count = other.bloom_memtable_miss_count; + bloom_sst_hit_count = other.bloom_sst_hit_count; + bloom_sst_miss_count = other.bloom_sst_miss_count; + key_lock_wait_time = other.key_lock_wait_time; + key_lock_wait_count = other.key_lock_wait_count; + + env_new_sequential_file_nanos = other.env_new_sequential_file_nanos; + env_new_random_access_file_nanos = other.env_new_random_access_file_nanos; + env_new_writable_file_nanos = other.env_new_writable_file_nanos; + env_reuse_writable_file_nanos = other.env_reuse_writable_file_nanos; + env_new_random_rw_file_nanos = other.env_new_random_rw_file_nanos; + env_new_directory_nanos = other.env_new_directory_nanos; + env_file_exists_nanos = other.env_file_exists_nanos; + env_get_children_nanos = other.env_get_children_nanos; + env_get_children_file_attributes_nanos = + other.env_get_children_file_attributes_nanos; + env_delete_file_nanos = other.env_delete_file_nanos; + env_create_dir_nanos = other.env_create_dir_nanos; + env_create_dir_if_missing_nanos = other.env_create_dir_if_missing_nanos; + env_delete_dir_nanos = other.env_delete_dir_nanos; + env_get_file_size_nanos = other.env_get_file_size_nanos; + env_get_file_modification_time_nanos = + other.env_get_file_modification_time_nanos; + env_rename_file_nanos = other.env_rename_file_nanos; + env_link_file_nanos = other.env_link_file_nanos; + env_lock_file_nanos = other.env_lock_file_nanos; + env_unlock_file_nanos = other.env_unlock_file_nanos; + env_new_logger_nanos = other.env_new_logger_nanos; + get_cpu_nanos = other.get_cpu_nanos; + iter_next_cpu_nanos = other.iter_next_cpu_nanos; + iter_prev_cpu_nanos = other.iter_prev_cpu_nanos; + iter_seek_cpu_nanos = other.iter_seek_cpu_nanos; + if (per_level_perf_context_enabled && level_to_perf_context != nullptr) { + ClearPerLevelPerfContext(); + } + if (other.level_to_perf_context != nullptr) { + level_to_perf_context = new std::map<uint32_t, PerfContextByLevel>(); + *level_to_perf_context = *other.level_to_perf_context; + } + per_level_perf_context_enabled = other.per_level_perf_context_enabled; +#endif + return *this; +} + +void PerfContext::Reset() { +#ifndef NPERF_CONTEXT + user_key_comparison_count = 0; + block_cache_hit_count = 0; + block_read_count = 0; + block_read_byte = 0; + block_read_time = 0; + block_cache_index_hit_count = 0; + index_block_read_count = 0; + block_cache_filter_hit_count = 0; + filter_block_read_count = 0; + compression_dict_block_read_count = 0; + block_checksum_time = 0; + block_decompress_time = 0; + get_read_bytes = 0; + multiget_read_bytes = 0; + iter_read_bytes = 0; + internal_key_skipped_count = 0; + internal_delete_skipped_count = 0; + internal_recent_skipped_count = 0; + internal_merge_count = 0; + write_wal_time = 0; + + get_snapshot_time = 0; + get_from_memtable_time = 0; + get_from_memtable_count = 0; + get_post_process_time = 0; + get_from_output_files_time = 0; + seek_on_memtable_time = 0; + seek_on_memtable_count = 0; + next_on_memtable_count = 0; + prev_on_memtable_count = 0; + seek_child_seek_time = 0; + seek_child_seek_count = 0; + seek_min_heap_time = 0; + seek_internal_seek_time = 0; + find_next_user_entry_time = 0; + write_pre_and_post_process_time = 0; + write_memtable_time = 0; + write_delay_time = 0; + write_thread_wait_nanos = 0; + write_scheduling_flushes_compactions_time = 0; + db_mutex_lock_nanos = 0; + db_condition_wait_nanos = 0; + merge_operator_time_nanos = 0; + read_index_block_nanos = 0; + read_filter_block_nanos = 0; + new_table_block_iter_nanos = 0; + new_table_iterator_nanos = 0; + block_seek_nanos = 0; + find_table_nanos = 0; + bloom_memtable_hit_count = 0; + bloom_memtable_miss_count = 0; + bloom_sst_hit_count = 0; + bloom_sst_miss_count = 0; + key_lock_wait_time = 0; + key_lock_wait_count = 0; + + env_new_sequential_file_nanos = 0; + env_new_random_access_file_nanos = 0; + env_new_writable_file_nanos = 0; + env_reuse_writable_file_nanos = 0; + env_new_random_rw_file_nanos = 0; + env_new_directory_nanos = 0; + env_file_exists_nanos = 0; + env_get_children_nanos = 0; + env_get_children_file_attributes_nanos = 0; + env_delete_file_nanos = 0; + env_create_dir_nanos = 0; + env_create_dir_if_missing_nanos = 0; + env_delete_dir_nanos = 0; + env_get_file_size_nanos = 0; + env_get_file_modification_time_nanos = 0; + env_rename_file_nanos = 0; + env_link_file_nanos = 0; + env_lock_file_nanos = 0; + env_unlock_file_nanos = 0; + env_new_logger_nanos = 0; + get_cpu_nanos = 0; + iter_next_cpu_nanos = 0; + iter_prev_cpu_nanos = 0; + iter_seek_cpu_nanos = 0; + if (per_level_perf_context_enabled && level_to_perf_context) { + for (auto& kv : *level_to_perf_context) { + kv.second.Reset(); + } + } +#endif +} + +#define PERF_CONTEXT_OUTPUT(counter) \ + if (!exclude_zero_counters || (counter > 0)) { \ + ss << #counter << " = " << counter << ", "; \ + } + +#define PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(counter) \ + if (per_level_perf_context_enabled && \ + level_to_perf_context) { \ + ss << #counter << " = "; \ + for (auto& kv : *level_to_perf_context) { \ + if (!exclude_zero_counters || (kv.second.counter > 0)) { \ + ss << kv.second.counter << "@level" << kv.first << ", "; \ + } \ + } \ + } + +void PerfContextByLevel::Reset() { +#ifndef NPERF_CONTEXT + bloom_filter_useful = 0; + bloom_filter_full_positive = 0; + bloom_filter_full_true_positive = 0; + block_cache_hit_count = 0; + block_cache_miss_count = 0; +#endif +} + +std::string PerfContext::ToString(bool exclude_zero_counters) const { +#ifdef NPERF_CONTEXT + return ""; +#else + std::ostringstream ss; + PERF_CONTEXT_OUTPUT(user_key_comparison_count); + PERF_CONTEXT_OUTPUT(block_cache_hit_count); + PERF_CONTEXT_OUTPUT(block_read_count); + PERF_CONTEXT_OUTPUT(block_read_byte); + PERF_CONTEXT_OUTPUT(block_read_time); + PERF_CONTEXT_OUTPUT(block_cache_index_hit_count); + PERF_CONTEXT_OUTPUT(index_block_read_count); + PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count); + PERF_CONTEXT_OUTPUT(filter_block_read_count); + PERF_CONTEXT_OUTPUT(compression_dict_block_read_count); + PERF_CONTEXT_OUTPUT(block_checksum_time); + PERF_CONTEXT_OUTPUT(block_decompress_time); + PERF_CONTEXT_OUTPUT(get_read_bytes); + PERF_CONTEXT_OUTPUT(multiget_read_bytes); + PERF_CONTEXT_OUTPUT(iter_read_bytes); + PERF_CONTEXT_OUTPUT(internal_key_skipped_count); + PERF_CONTEXT_OUTPUT(internal_delete_skipped_count); + PERF_CONTEXT_OUTPUT(internal_recent_skipped_count); + PERF_CONTEXT_OUTPUT(internal_merge_count); + PERF_CONTEXT_OUTPUT(write_wal_time); + PERF_CONTEXT_OUTPUT(get_snapshot_time); + PERF_CONTEXT_OUTPUT(get_from_memtable_time); + PERF_CONTEXT_OUTPUT(get_from_memtable_count); + PERF_CONTEXT_OUTPUT(get_post_process_time); + PERF_CONTEXT_OUTPUT(get_from_output_files_time); + PERF_CONTEXT_OUTPUT(seek_on_memtable_time); + PERF_CONTEXT_OUTPUT(seek_on_memtable_count); + PERF_CONTEXT_OUTPUT(next_on_memtable_count); + PERF_CONTEXT_OUTPUT(prev_on_memtable_count); + PERF_CONTEXT_OUTPUT(seek_child_seek_time); + PERF_CONTEXT_OUTPUT(seek_child_seek_count); + PERF_CONTEXT_OUTPUT(seek_min_heap_time); + PERF_CONTEXT_OUTPUT(seek_internal_seek_time); + PERF_CONTEXT_OUTPUT(find_next_user_entry_time); + PERF_CONTEXT_OUTPUT(write_pre_and_post_process_time); + PERF_CONTEXT_OUTPUT(write_memtable_time); + PERF_CONTEXT_OUTPUT(write_thread_wait_nanos); + PERF_CONTEXT_OUTPUT(write_scheduling_flushes_compactions_time); + PERF_CONTEXT_OUTPUT(db_mutex_lock_nanos); + PERF_CONTEXT_OUTPUT(db_condition_wait_nanos); + PERF_CONTEXT_OUTPUT(merge_operator_time_nanos); + PERF_CONTEXT_OUTPUT(write_delay_time); + PERF_CONTEXT_OUTPUT(read_index_block_nanos); + PERF_CONTEXT_OUTPUT(read_filter_block_nanos); + PERF_CONTEXT_OUTPUT(new_table_block_iter_nanos); + PERF_CONTEXT_OUTPUT(new_table_iterator_nanos); + PERF_CONTEXT_OUTPUT(block_seek_nanos); + PERF_CONTEXT_OUTPUT(find_table_nanos); + PERF_CONTEXT_OUTPUT(bloom_memtable_hit_count); + PERF_CONTEXT_OUTPUT(bloom_memtable_miss_count); + PERF_CONTEXT_OUTPUT(bloom_sst_hit_count); + PERF_CONTEXT_OUTPUT(bloom_sst_miss_count); + PERF_CONTEXT_OUTPUT(key_lock_wait_time); + PERF_CONTEXT_OUTPUT(key_lock_wait_count); + PERF_CONTEXT_OUTPUT(env_new_sequential_file_nanos); + PERF_CONTEXT_OUTPUT(env_new_random_access_file_nanos); + PERF_CONTEXT_OUTPUT(env_new_writable_file_nanos); + PERF_CONTEXT_OUTPUT(env_reuse_writable_file_nanos); + PERF_CONTEXT_OUTPUT(env_new_random_rw_file_nanos); + PERF_CONTEXT_OUTPUT(env_new_directory_nanos); + PERF_CONTEXT_OUTPUT(env_file_exists_nanos); + PERF_CONTEXT_OUTPUT(env_get_children_nanos); + PERF_CONTEXT_OUTPUT(env_get_children_file_attributes_nanos); + PERF_CONTEXT_OUTPUT(env_delete_file_nanos); + PERF_CONTEXT_OUTPUT(env_create_dir_nanos); + PERF_CONTEXT_OUTPUT(env_create_dir_if_missing_nanos); + PERF_CONTEXT_OUTPUT(env_delete_dir_nanos); + PERF_CONTEXT_OUTPUT(env_get_file_size_nanos); + PERF_CONTEXT_OUTPUT(env_get_file_modification_time_nanos); + PERF_CONTEXT_OUTPUT(env_rename_file_nanos); + PERF_CONTEXT_OUTPUT(env_link_file_nanos); + PERF_CONTEXT_OUTPUT(env_lock_file_nanos); + PERF_CONTEXT_OUTPUT(env_unlock_file_nanos); + PERF_CONTEXT_OUTPUT(env_new_logger_nanos); + PERF_CONTEXT_OUTPUT(get_cpu_nanos); + PERF_CONTEXT_OUTPUT(iter_next_cpu_nanos); + PERF_CONTEXT_OUTPUT(iter_prev_cpu_nanos); + PERF_CONTEXT_OUTPUT(iter_seek_cpu_nanos); + PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_useful); + PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_full_positive); + PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(bloom_filter_full_true_positive); + PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(block_cache_hit_count); + PERF_CONTEXT_BY_LEVEL_OUTPUT_ONE_COUNTER(block_cache_miss_count); + + std::string str = ss.str(); + str.erase(str.find_last_not_of(", ") + 1); + return str; +#endif +} + +void PerfContext::EnablePerLevelPerfContext() { + if (level_to_perf_context == nullptr) { + level_to_perf_context = new std::map<uint32_t, PerfContextByLevel>(); + } + per_level_perf_context_enabled = true; +} + +void PerfContext::DisablePerLevelPerfContext(){ + per_level_perf_context_enabled = false; +} + +void PerfContext::ClearPerLevelPerfContext(){ + if (level_to_perf_context != nullptr) { + level_to_perf_context->clear(); + delete level_to_perf_context; + level_to_perf_context = nullptr; + } + per_level_perf_context_enabled = false; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/perf_context_imp.h b/src/rocksdb/monitoring/perf_context_imp.h new file mode 100644 index 000000000..cdca27621 --- /dev/null +++ b/src/rocksdb/monitoring/perf_context_imp.h @@ -0,0 +1,97 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "monitoring/perf_step_timer.h" +#include "rocksdb/perf_context.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { +#if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL) +extern PerfContext perf_context; +#else +#if defined(OS_SOLARIS) +extern __thread PerfContext perf_context_; +#define perf_context (*get_perf_context()) +#else +extern thread_local PerfContext perf_context; +#endif +#endif + +#if defined(NPERF_CONTEXT) + +#define PERF_TIMER_STOP(metric) +#define PERF_TIMER_START(metric) +#define PERF_TIMER_GUARD(metric) +#define PERF_TIMER_GUARD_WITH_ENV(metric, env) +#define PERF_CPU_TIMER_GUARD(metric, env) +#define PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(metric, condition, stats, \ + ticker_type) +#define PERF_TIMER_MEASURE(metric) +#define PERF_COUNTER_ADD(metric, value) +#define PERF_COUNTER_BY_LEVEL_ADD(metric, value, level) + +#else + +// Stop the timer and update the metric +#define PERF_TIMER_STOP(metric) perf_step_timer_##metric.Stop(); + +#define PERF_TIMER_START(metric) perf_step_timer_##metric.Start(); + +// Declare and set start time of the timer +#define PERF_TIMER_GUARD(metric) \ + PerfStepTimer perf_step_timer_##metric(&(perf_context.metric)); \ + perf_step_timer_##metric.Start(); + +// Declare and set start time of the timer +#define PERF_TIMER_GUARD_WITH_ENV(metric, env) \ + PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), env); \ + perf_step_timer_##metric.Start(); + +// Declare and set start time of the timer +#define PERF_CPU_TIMER_GUARD(metric, env) \ + PerfStepTimer perf_step_timer_##metric( \ + &(perf_context.metric), env, true, \ + PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \ + perf_step_timer_##metric.Start(); + +#define PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(metric, condition, stats, \ + ticker_type) \ + PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), nullptr, \ + false, PerfLevel::kEnableTime, stats, \ + ticker_type); \ + if (condition) { \ + perf_step_timer_##metric.Start(); \ + } + +// Update metric with time elapsed since last START. start time is reset +// to current timestamp. +#define PERF_TIMER_MEASURE(metric) perf_step_timer_##metric.Measure(); + +// Increase metric value +#define PERF_COUNTER_ADD(metric, value) \ + if (perf_level >= PerfLevel::kEnableCount) { \ + perf_context.metric += value; \ + } + +// Increase metric value +#define PERF_COUNTER_BY_LEVEL_ADD(metric, value, level) \ + if (perf_level >= PerfLevel::kEnableCount && \ + perf_context.per_level_perf_context_enabled && \ + perf_context.level_to_perf_context) { \ + if ((*(perf_context.level_to_perf_context)).find(level) != \ + (*(perf_context.level_to_perf_context)).end()) { \ + (*(perf_context.level_to_perf_context))[level].metric += value; \ + } \ + else { \ + PerfContextByLevel empty_context; \ + (*(perf_context.level_to_perf_context))[level] = empty_context; \ + (*(perf_context.level_to_perf_context))[level].metric += value; \ + } \ + } \ + +#endif + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/perf_level.cc b/src/rocksdb/monitoring/perf_level.cc new file mode 100644 index 000000000..27bff0d28 --- /dev/null +++ b/src/rocksdb/monitoring/perf_level.cc @@ -0,0 +1,28 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include <assert.h> +#include "monitoring/perf_level_imp.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL +__thread PerfLevel perf_level = kEnableCount; +#else +PerfLevel perf_level = kEnableCount; +#endif + +void SetPerfLevel(PerfLevel level) { + assert(level > kUninitialized); + assert(level < kOutOfBounds); + perf_level = level; +} + +PerfLevel GetPerfLevel() { + return perf_level; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/perf_level_imp.h b/src/rocksdb/monitoring/perf_level_imp.h new file mode 100644 index 000000000..01277af57 --- /dev/null +++ b/src/rocksdb/monitoring/perf_level_imp.h @@ -0,0 +1,18 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "rocksdb/perf_level.h" +#include "port/port.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL +extern __thread PerfLevel perf_level; +#else +extern PerfLevel perf_level; +#endif + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/perf_step_timer.h b/src/rocksdb/monitoring/perf_step_timer.h new file mode 100644 index 000000000..f2d35d9d6 --- /dev/null +++ b/src/rocksdb/monitoring/perf_step_timer.h @@ -0,0 +1,79 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "monitoring/perf_level_imp.h" +#include "rocksdb/env.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { + +class PerfStepTimer { + public: + explicit PerfStepTimer( + uint64_t* metric, Env* env = nullptr, bool use_cpu_time = false, + PerfLevel enable_level = PerfLevel::kEnableTimeExceptForMutex, + Statistics* statistics = nullptr, uint32_t ticker_type = 0) + : perf_counter_enabled_(perf_level >= enable_level), + use_cpu_time_(use_cpu_time), + env_((perf_counter_enabled_ || statistics != nullptr) + ? ((env != nullptr) ? env : Env::Default()) + : nullptr), + start_(0), + metric_(metric), + statistics_(statistics), + ticker_type_(ticker_type) {} + + ~PerfStepTimer() { + Stop(); + } + + void Start() { + if (perf_counter_enabled_ || statistics_ != nullptr) { + start_ = time_now(); + } + } + + uint64_t time_now() { + if (!use_cpu_time_) { + return env_->NowNanos(); + } else { + return env_->NowCPUNanos(); + } + } + + void Measure() { + if (start_) { + uint64_t now = time_now(); + *metric_ += now - start_; + start_ = now; + } + } + + void Stop() { + if (start_) { + uint64_t duration = time_now() - start_; + if (perf_counter_enabled_) { + *metric_ += duration; + } + + if (statistics_ != nullptr) { + RecordTick(statistics_, ticker_type_, duration); + } + start_ = 0; + } + } + + private: + const bool perf_counter_enabled_; + const bool use_cpu_time_; + Env* const env_; + uint64_t start_; + uint64_t* metric_; + Statistics* statistics_; + uint32_t ticker_type_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/persistent_stats_history.cc b/src/rocksdb/monitoring/persistent_stats_history.cc new file mode 100644 index 000000000..7cc869cf2 --- /dev/null +++ b/src/rocksdb/monitoring/persistent_stats_history.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "monitoring/persistent_stats_history.h" + +#include <cstring> +#include <string> +#include <utility> +#include "db/db_impl/db_impl.h" +#include "port/likely.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +// 10 digit seconds timestamp => [Sep 9, 2001 ~ Nov 20, 2286] +const int kNowSecondsStringLength = 10; +const std::string kFormatVersionKeyString = + "__persistent_stats_format_version__"; +const std::string kCompatibleVersionKeyString = + "__persistent_stats_compatible_version__"; +// Every release maintains two versions numbers for persistents stats: Current +// format version and compatible format version. Current format version +// designates what type of encoding will be used when writing to stats CF; +// compatible format version designates the minimum format version that +// can decode the stats CF encoded using the current format version. +const uint64_t kStatsCFCurrentFormatVersion = 1; +const uint64_t kStatsCFCompatibleFormatVersion = 1; + +Status DecodePersistentStatsVersionNumber(DBImpl* db, StatsVersionKeyType type, + uint64_t* version_number) { + if (type >= StatsVersionKeyType::kKeyTypeMax) { + return Status::InvalidArgument("Invalid stats version key type provided"); + } + std::string key; + if (type == StatsVersionKeyType::kFormatVersion) { + key = kFormatVersionKeyString; + } else if (type == StatsVersionKeyType::kCompatibleVersion) { + key = kCompatibleVersionKeyString; + } + ReadOptions options; + options.verify_checksums = true; + std::string result; + Status s = db->Get(options, db->PersistentStatsColumnFamily(), key, &result); + if (!s.ok() || result.empty()) { + return Status::NotFound("Persistent stats version key " + key + + " not found."); + } + + // read version_number but do nothing in current version + *version_number = ParseUint64(result); + return Status::OK(); +} + +int EncodePersistentStatsKey(uint64_t now_seconds, const std::string& key, + int size, char* buf) { + char timestamp[kNowSecondsStringLength + 1]; + // make time stamp string equal in length to allow sorting by time + snprintf(timestamp, sizeof(timestamp), "%010d", + static_cast<int>(now_seconds)); + timestamp[kNowSecondsStringLength] = '\0'; + return snprintf(buf, size, "%s#%s", timestamp, key.c_str()); +} + +void OptimizeForPersistentStats(ColumnFamilyOptions* cfo) { + cfo->write_buffer_size = 2 << 20; + cfo->target_file_size_base = 2 * 1048576; + cfo->max_bytes_for_level_base = 10 * 1048576; + cfo->soft_pending_compaction_bytes_limit = 256 * 1048576; + cfo->hard_pending_compaction_bytes_limit = 1073741824ul; + cfo->compression = kNoCompression; +} + +PersistentStatsHistoryIterator::~PersistentStatsHistoryIterator() {} + +bool PersistentStatsHistoryIterator::Valid() const { return valid_; } + +Status PersistentStatsHistoryIterator::status() const { return status_; } + +void PersistentStatsHistoryIterator::Next() { + // increment start_time by 1 to avoid infinite loop + AdvanceIteratorByTime(GetStatsTime() + 1, end_time_); +} + +uint64_t PersistentStatsHistoryIterator::GetStatsTime() const { return time_; } + +const std::map<std::string, uint64_t>& +PersistentStatsHistoryIterator::GetStatsMap() const { + return stats_map_; +} + +std::pair<uint64_t, std::string> parseKey(const Slice& key, + uint64_t start_time) { + std::pair<uint64_t, std::string> result; + std::string key_str = key.ToString(); + std::string::size_type pos = key_str.find("#"); + // TODO(Zhongyi): add counters to track parse failures? + if (pos == std::string::npos) { + result.first = port::kMaxUint64; + result.second.clear(); + } else { + uint64_t parsed_time = ParseUint64(key_str.substr(0, pos)); + // skip entries with timestamp smaller than start_time + if (parsed_time < start_time) { + result.first = port::kMaxUint64; + result.second = ""; + } else { + result.first = parsed_time; + std::string key_resize = key_str.substr(pos + 1); + result.second = key_resize; + } + } + return result; +} + +// advance the iterator to the next time between [start_time, end_time) +// if success, update time_ and stats_map_ with new_time and stats_map +void PersistentStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time, + uint64_t end_time) { + // try to find next entry in stats_history_ map + if (db_impl_ != nullptr) { + ReadOptions ro; + Iterator* iter = + db_impl_->NewIterator(ro, db_impl_->PersistentStatsColumnFamily()); + + char timestamp[kNowSecondsStringLength + 1]; + snprintf(timestamp, sizeof(timestamp), "%010d", + static_cast<int>(std::max(time_, start_time))); + timestamp[kNowSecondsStringLength] = '\0'; + + iter->Seek(timestamp); + // no more entries with timestamp >= start_time is found or version key + // is found to be incompatible + if (!iter->Valid()) { + valid_ = false; + delete iter; + return; + } + time_ = parseKey(iter->key(), start_time).first; + valid_ = true; + // check parsed time and invalid if it exceeds end_time + if (time_ > end_time) { + valid_ = false; + delete iter; + return; + } + // find all entries with timestamp equal to time_ + std::map<std::string, uint64_t> new_stats_map; + std::pair<uint64_t, std::string> kv; + for (; iter->Valid(); iter->Next()) { + kv = parseKey(iter->key(), start_time); + if (kv.first != time_) { + break; + } + if (kv.second.compare(kFormatVersionKeyString) == 0) { + continue; + } + new_stats_map[kv.second] = ParseUint64(iter->value().ToString()); + } + stats_map_.swap(new_stats_map); + delete iter; + } else { + valid_ = false; + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/persistent_stats_history.h b/src/rocksdb/monitoring/persistent_stats_history.h new file mode 100644 index 000000000..7c711fe4e --- /dev/null +++ b/src/rocksdb/monitoring/persistent_stats_history.h @@ -0,0 +1,83 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "db/db_impl/db_impl.h" +#include "rocksdb/stats_history.h" + +namespace ROCKSDB_NAMESPACE { + +extern const std::string kFormatVersionKeyString; +extern const std::string kCompatibleVersionKeyString; +extern const uint64_t kStatsCFCurrentFormatVersion; +extern const uint64_t kStatsCFCompatibleFormatVersion; + +enum StatsVersionKeyType : uint32_t { + kFormatVersion = 1, + kCompatibleVersion = 2, + kKeyTypeMax = 3 +}; + +// Read the version number from persitent stats cf depending on type provided +// stores the version number in `*version_number` +// returns Status::OK() on success, or other status code on failure +Status DecodePersistentStatsVersionNumber(DBImpl* db, StatsVersionKeyType type, + uint64_t* version_number); + +// Encode timestamp and stats key into buf +// Format: timestamp(10 digit) + '#' + key +// Total length of encoded key will be capped at 100 bytes +int EncodePersistentStatsKey(uint64_t timestamp, const std::string& key, + int size, char* buf); + +void OptimizeForPersistentStats(ColumnFamilyOptions* cfo); + +class PersistentStatsHistoryIterator final : public StatsHistoryIterator { + public: + PersistentStatsHistoryIterator(uint64_t start_time, uint64_t end_time, + DBImpl* db_impl) + : time_(0), + start_time_(start_time), + end_time_(end_time), + valid_(true), + db_impl_(db_impl) { + AdvanceIteratorByTime(start_time_, end_time_); + } + ~PersistentStatsHistoryIterator() override; + bool Valid() const override; + Status status() const override; + + void Next() override; + uint64_t GetStatsTime() const override; + + const std::map<std::string, uint64_t>& GetStatsMap() const override; + + private: + // advance the iterator to the next stats history record with timestamp + // between [start_time, end_time) + void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time); + + // No copying allowed + PersistentStatsHistoryIterator(const PersistentStatsHistoryIterator&) = + delete; + void operator=(const PersistentStatsHistoryIterator&) = delete; + PersistentStatsHistoryIterator(PersistentStatsHistoryIterator&&) = delete; + PersistentStatsHistoryIterator& operator=(PersistentStatsHistoryIterator&&) = + delete; + + uint64_t time_; + uint64_t start_time_; + uint64_t end_time_; + std::map<std::string, uint64_t> stats_map_; + Status status_; + bool valid_; + DBImpl* db_impl_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/statistics.cc b/src/rocksdb/monitoring/statistics.cc new file mode 100644 index 000000000..ff610c8a1 --- /dev/null +++ b/src/rocksdb/monitoring/statistics.cc @@ -0,0 +1,406 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "monitoring/statistics.h" + +#include <algorithm> +#include <cinttypes> +#include <cstdio> +#include "port/likely.h" +#include "rocksdb/statistics.h" + +namespace ROCKSDB_NAMESPACE { + +// The order of items listed in Tickers should be the same as +// the order listed in TickersNameMap +const std::vector<std::pair<Tickers, std::string>> TickersNameMap = { + {BLOCK_CACHE_MISS, "rocksdb.block.cache.miss"}, + {BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"}, + {BLOCK_CACHE_ADD, "rocksdb.block.cache.add"}, + {BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"}, + {BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"}, + {BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"}, + {BLOCK_CACHE_INDEX_ADD, "rocksdb.block.cache.index.add"}, + {BLOCK_CACHE_INDEX_BYTES_INSERT, "rocksdb.block.cache.index.bytes.insert"}, + {BLOCK_CACHE_INDEX_BYTES_EVICT, "rocksdb.block.cache.index.bytes.evict"}, + {BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss"}, + {BLOCK_CACHE_FILTER_HIT, "rocksdb.block.cache.filter.hit"}, + {BLOCK_CACHE_FILTER_ADD, "rocksdb.block.cache.filter.add"}, + {BLOCK_CACHE_FILTER_BYTES_INSERT, + "rocksdb.block.cache.filter.bytes.insert"}, + {BLOCK_CACHE_FILTER_BYTES_EVICT, "rocksdb.block.cache.filter.bytes.evict"}, + {BLOCK_CACHE_DATA_MISS, "rocksdb.block.cache.data.miss"}, + {BLOCK_CACHE_DATA_HIT, "rocksdb.block.cache.data.hit"}, + {BLOCK_CACHE_DATA_ADD, "rocksdb.block.cache.data.add"}, + {BLOCK_CACHE_DATA_BYTES_INSERT, "rocksdb.block.cache.data.bytes.insert"}, + {BLOCK_CACHE_BYTES_READ, "rocksdb.block.cache.bytes.read"}, + {BLOCK_CACHE_BYTES_WRITE, "rocksdb.block.cache.bytes.write"}, + {BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful"}, + {BLOOM_FILTER_FULL_POSITIVE, "rocksdb.bloom.filter.full.positive"}, + {BLOOM_FILTER_FULL_TRUE_POSITIVE, + "rocksdb.bloom.filter.full.true.positive"}, + {BLOOM_FILTER_MICROS, "rocksdb.bloom.filter.micros"}, + {PERSISTENT_CACHE_HIT, "rocksdb.persistent.cache.hit"}, + {PERSISTENT_CACHE_MISS, "rocksdb.persistent.cache.miss"}, + {SIM_BLOCK_CACHE_HIT, "rocksdb.sim.block.cache.hit"}, + {SIM_BLOCK_CACHE_MISS, "rocksdb.sim.block.cache.miss"}, + {MEMTABLE_HIT, "rocksdb.memtable.hit"}, + {MEMTABLE_MISS, "rocksdb.memtable.miss"}, + {GET_HIT_L0, "rocksdb.l0.hit"}, + {GET_HIT_L1, "rocksdb.l1.hit"}, + {GET_HIT_L2_AND_UP, "rocksdb.l2andup.hit"}, + {COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new"}, + {COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete"}, + {COMPACTION_KEY_DROP_RANGE_DEL, "rocksdb.compaction.key.drop.range_del"}, + {COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user"}, + {COMPACTION_RANGE_DEL_DROP_OBSOLETE, + "rocksdb.compaction.range_del.drop.obsolete"}, + {COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, + "rocksdb.compaction.optimized.del.drop.obsolete"}, + {COMPACTION_CANCELLED, "rocksdb.compaction.cancelled"}, + {NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written"}, + {NUMBER_KEYS_READ, "rocksdb.number.keys.read"}, + {NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated"}, + {BYTES_WRITTEN, "rocksdb.bytes.written"}, + {BYTES_READ, "rocksdb.bytes.read"}, + {NUMBER_DB_SEEK, "rocksdb.number.db.seek"}, + {NUMBER_DB_NEXT, "rocksdb.number.db.next"}, + {NUMBER_DB_PREV, "rocksdb.number.db.prev"}, + {NUMBER_DB_SEEK_FOUND, "rocksdb.number.db.seek.found"}, + {NUMBER_DB_NEXT_FOUND, "rocksdb.number.db.next.found"}, + {NUMBER_DB_PREV_FOUND, "rocksdb.number.db.prev.found"}, + {ITER_BYTES_READ, "rocksdb.db.iter.bytes.read"}, + {NO_FILE_CLOSES, "rocksdb.no.file.closes"}, + {NO_FILE_OPENS, "rocksdb.no.file.opens"}, + {NO_FILE_ERRORS, "rocksdb.no.file.errors"}, + {STALL_L0_SLOWDOWN_MICROS, "rocksdb.l0.slowdown.micros"}, + {STALL_MEMTABLE_COMPACTION_MICROS, "rocksdb.memtable.compaction.micros"}, + {STALL_L0_NUM_FILES_MICROS, "rocksdb.l0.num.files.stall.micros"}, + {STALL_MICROS, "rocksdb.stall.micros"}, + {DB_MUTEX_WAIT_MICROS, "rocksdb.db.mutex.wait.micros"}, + {RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.delay.millis"}, + {NO_ITERATORS, "rocksdb.num.iterators"}, + {NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get"}, + {NUMBER_MULTIGET_KEYS_READ, "rocksdb.number.multiget.keys.read"}, + {NUMBER_MULTIGET_BYTES_READ, "rocksdb.number.multiget.bytes.read"}, + {NUMBER_FILTERED_DELETES, "rocksdb.number.deletes.filtered"}, + {NUMBER_MERGE_FAILURES, "rocksdb.number.merge.failures"}, + {BLOOM_FILTER_PREFIX_CHECKED, "rocksdb.bloom.filter.prefix.checked"}, + {BLOOM_FILTER_PREFIX_USEFUL, "rocksdb.bloom.filter.prefix.useful"}, + {NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration"}, + {GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls"}, + {BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss"}, + {BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit"}, + {BLOCK_CACHE_COMPRESSED_ADD, "rocksdb.block.cachecompressed.add"}, + {BLOCK_CACHE_COMPRESSED_ADD_FAILURES, + "rocksdb.block.cachecompressed.add.failures"}, + {WAL_FILE_SYNCED, "rocksdb.wal.synced"}, + {WAL_FILE_BYTES, "rocksdb.wal.bytes"}, + {WRITE_DONE_BY_SELF, "rocksdb.write.self"}, + {WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, + {WRITE_TIMEDOUT, "rocksdb.write.timeout"}, + {WRITE_WITH_WAL, "rocksdb.write.wal"}, + {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, + {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, + {FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"}, + {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, + "rocksdb.number.direct.load.table.properties"}, + {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"}, + {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, + {NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"}, + {NUMBER_BLOCK_COMPRESSED, "rocksdb.number.block.compressed"}, + {NUMBER_BLOCK_DECOMPRESSED, "rocksdb.number.block.decompressed"}, + {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"}, + {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"}, + {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, + {ROW_CACHE_HIT, "rocksdb.row.cache.hit"}, + {ROW_CACHE_MISS, "rocksdb.row.cache.miss"}, + {READ_AMP_ESTIMATE_USEFUL_BYTES, "rocksdb.read.amp.estimate.useful.bytes"}, + {READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"}, + {NUMBER_RATE_LIMITER_DRAINS, "rocksdb.number.rate_limiter.drains"}, + {NUMBER_ITER_SKIP, "rocksdb.number.iter.skip"}, + {BLOB_DB_NUM_PUT, "rocksdb.blobdb.num.put"}, + {BLOB_DB_NUM_WRITE, "rocksdb.blobdb.num.write"}, + {BLOB_DB_NUM_GET, "rocksdb.blobdb.num.get"}, + {BLOB_DB_NUM_MULTIGET, "rocksdb.blobdb.num.multiget"}, + {BLOB_DB_NUM_SEEK, "rocksdb.blobdb.num.seek"}, + {BLOB_DB_NUM_NEXT, "rocksdb.blobdb.num.next"}, + {BLOB_DB_NUM_PREV, "rocksdb.blobdb.num.prev"}, + {BLOB_DB_NUM_KEYS_WRITTEN, "rocksdb.blobdb.num.keys.written"}, + {BLOB_DB_NUM_KEYS_READ, "rocksdb.blobdb.num.keys.read"}, + {BLOB_DB_BYTES_WRITTEN, "rocksdb.blobdb.bytes.written"}, + {BLOB_DB_BYTES_READ, "rocksdb.blobdb.bytes.read"}, + {BLOB_DB_WRITE_INLINED, "rocksdb.blobdb.write.inlined"}, + {BLOB_DB_WRITE_INLINED_TTL, "rocksdb.blobdb.write.inlined.ttl"}, + {BLOB_DB_WRITE_BLOB, "rocksdb.blobdb.write.blob"}, + {BLOB_DB_WRITE_BLOB_TTL, "rocksdb.blobdb.write.blob.ttl"}, + {BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "rocksdb.blobdb.blob.file.bytes.written"}, + {BLOB_DB_BLOB_FILE_BYTES_READ, "rocksdb.blobdb.blob.file.bytes.read"}, + {BLOB_DB_BLOB_FILE_SYNCED, "rocksdb.blobdb.blob.file.synced"}, + {BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, + "rocksdb.blobdb.blob.index.expired.count"}, + {BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, "rocksdb.blobdb.blob.index.expired.size"}, + {BLOB_DB_BLOB_INDEX_EVICTED_COUNT, + "rocksdb.blobdb.blob.index.evicted.count"}, + {BLOB_DB_BLOB_INDEX_EVICTED_SIZE, "rocksdb.blobdb.blob.index.evicted.size"}, + {BLOB_DB_GC_NUM_FILES, "rocksdb.blobdb.gc.num.files"}, + {BLOB_DB_GC_NUM_NEW_FILES, "rocksdb.blobdb.gc.num.new.files"}, + {BLOB_DB_GC_FAILURES, "rocksdb.blobdb.gc.failures"}, + {BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, "rocksdb.blobdb.gc.num.keys.overwritten"}, + {BLOB_DB_GC_NUM_KEYS_EXPIRED, "rocksdb.blobdb.gc.num.keys.expired"}, + {BLOB_DB_GC_NUM_KEYS_RELOCATED, "rocksdb.blobdb.gc.num.keys.relocated"}, + {BLOB_DB_GC_BYTES_OVERWRITTEN, "rocksdb.blobdb.gc.bytes.overwritten"}, + {BLOB_DB_GC_BYTES_EXPIRED, "rocksdb.blobdb.gc.bytes.expired"}, + {BLOB_DB_GC_BYTES_RELOCATED, "rocksdb.blobdb.gc.bytes.relocated"}, + {BLOB_DB_FIFO_NUM_FILES_EVICTED, "rocksdb.blobdb.fifo.num.files.evicted"}, + {BLOB_DB_FIFO_NUM_KEYS_EVICTED, "rocksdb.blobdb.fifo.num.keys.evicted"}, + {BLOB_DB_FIFO_BYTES_EVICTED, "rocksdb.blobdb.fifo.bytes.evicted"}, + {TXN_PREPARE_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.prepare"}, + {TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD, + "rocksdb.txn.overhead.mutex.old.commit.map"}, + {TXN_DUPLICATE_KEY_OVERHEAD, "rocksdb.txn.overhead.duplicate.key"}, + {TXN_SNAPSHOT_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.snapshot"}, + {TXN_GET_TRY_AGAIN, "rocksdb.txn.get.tryagain"}, + {NUMBER_MULTIGET_KEYS_FOUND, "rocksdb.number.multiget.keys.found"}, + {NO_ITERATOR_CREATED, "rocksdb.num.iterator.created"}, + {NO_ITERATOR_DELETED, "rocksdb.num.iterator.deleted"}, + {BLOCK_CACHE_COMPRESSION_DICT_MISS, + "rocksdb.block.cache.compression.dict.miss"}, + {BLOCK_CACHE_COMPRESSION_DICT_HIT, + "rocksdb.block.cache.compression.dict.hit"}, + {BLOCK_CACHE_COMPRESSION_DICT_ADD, + "rocksdb.block.cache.compression.dict.add"}, + {BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, + "rocksdb.block.cache.compression.dict.bytes.insert"}, + {BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, + "rocksdb.block.cache.compression.dict.bytes.evict"}, +}; + +const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = { + {DB_GET, "rocksdb.db.get.micros"}, + {DB_WRITE, "rocksdb.db.write.micros"}, + {COMPACTION_TIME, "rocksdb.compaction.times.micros"}, + {COMPACTION_CPU_TIME, "rocksdb.compaction.times.cpu_micros"}, + {SUBCOMPACTION_SETUP_TIME, "rocksdb.subcompaction.setup.times.micros"}, + {TABLE_SYNC_MICROS, "rocksdb.table.sync.micros"}, + {COMPACTION_OUTFILE_SYNC_MICROS, "rocksdb.compaction.outfile.sync.micros"}, + {WAL_FILE_SYNC_MICROS, "rocksdb.wal.file.sync.micros"}, + {MANIFEST_FILE_SYNC_MICROS, "rocksdb.manifest.file.sync.micros"}, + {TABLE_OPEN_IO_MICROS, "rocksdb.table.open.io.micros"}, + {DB_MULTIGET, "rocksdb.db.multiget.micros"}, + {READ_BLOCK_COMPACTION_MICROS, "rocksdb.read.block.compaction.micros"}, + {READ_BLOCK_GET_MICROS, "rocksdb.read.block.get.micros"}, + {WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros"}, + {STALL_L0_SLOWDOWN_COUNT, "rocksdb.l0.slowdown.count"}, + {STALL_MEMTABLE_COMPACTION_COUNT, "rocksdb.memtable.compaction.count"}, + {STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"}, + {HARD_RATE_LIMIT_DELAY_COUNT, "rocksdb.hard.rate.limit.delay.count"}, + {SOFT_RATE_LIMIT_DELAY_COUNT, "rocksdb.soft.rate.limit.delay.count"}, + {NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction"}, + {DB_SEEK, "rocksdb.db.seek.micros"}, + {WRITE_STALL, "rocksdb.db.write.stall"}, + {SST_READ_MICROS, "rocksdb.sst.read.micros"}, + {NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"}, + {BYTES_PER_READ, "rocksdb.bytes.per.read"}, + {BYTES_PER_WRITE, "rocksdb.bytes.per.write"}, + {BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"}, + {BYTES_COMPRESSED, "rocksdb.bytes.compressed"}, + {BYTES_DECOMPRESSED, "rocksdb.bytes.decompressed"}, + {COMPRESSION_TIMES_NANOS, "rocksdb.compression.times.nanos"}, + {DECOMPRESSION_TIMES_NANOS, "rocksdb.decompression.times.nanos"}, + {READ_NUM_MERGE_OPERANDS, "rocksdb.read.num.merge_operands"}, + {BLOB_DB_KEY_SIZE, "rocksdb.blobdb.key.size"}, + {BLOB_DB_VALUE_SIZE, "rocksdb.blobdb.value.size"}, + {BLOB_DB_WRITE_MICROS, "rocksdb.blobdb.write.micros"}, + {BLOB_DB_GET_MICROS, "rocksdb.blobdb.get.micros"}, + {BLOB_DB_MULTIGET_MICROS, "rocksdb.blobdb.multiget.micros"}, + {BLOB_DB_SEEK_MICROS, "rocksdb.blobdb.seek.micros"}, + {BLOB_DB_NEXT_MICROS, "rocksdb.blobdb.next.micros"}, + {BLOB_DB_PREV_MICROS, "rocksdb.blobdb.prev.micros"}, + {BLOB_DB_BLOB_FILE_WRITE_MICROS, "rocksdb.blobdb.blob.file.write.micros"}, + {BLOB_DB_BLOB_FILE_READ_MICROS, "rocksdb.blobdb.blob.file.read.micros"}, + {BLOB_DB_BLOB_FILE_SYNC_MICROS, "rocksdb.blobdb.blob.file.sync.micros"}, + {BLOB_DB_GC_MICROS, "rocksdb.blobdb.gc.micros"}, + {BLOB_DB_COMPRESSION_MICROS, "rocksdb.blobdb.compression.micros"}, + {BLOB_DB_DECOMPRESSION_MICROS, "rocksdb.blobdb.decompression.micros"}, + {FLUSH_TIME, "rocksdb.db.flush.micros"}, + {SST_BATCH_SIZE, "rocksdb.sst.batch.size"}, +}; + +std::shared_ptr<Statistics> CreateDBStatistics() { + return std::make_shared<StatisticsImpl>(nullptr); +} + +StatisticsImpl::StatisticsImpl(std::shared_ptr<Statistics> stats) + : stats_(std::move(stats)) {} + +StatisticsImpl::~StatisticsImpl() {} + +uint64_t StatisticsImpl::getTickerCount(uint32_t tickerType) const { + MutexLock lock(&aggregate_lock_); + return getTickerCountLocked(tickerType); +} + +uint64_t StatisticsImpl::getTickerCountLocked(uint32_t tickerType) const { + assert(tickerType < TICKER_ENUM_MAX); + uint64_t res = 0; + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + res += per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType]; + } + return res; +} + +void StatisticsImpl::histogramData(uint32_t histogramType, + HistogramData* const data) const { + MutexLock lock(&aggregate_lock_); + getHistogramImplLocked(histogramType)->Data(data); +} + +std::unique_ptr<HistogramImpl> StatisticsImpl::getHistogramImplLocked( + uint32_t histogramType) const { + assert(histogramType < HISTOGRAM_ENUM_MAX); + std::unique_ptr<HistogramImpl> res_hist(new HistogramImpl()); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + res_hist->Merge( + per_core_stats_.AccessAtCore(core_idx)->histograms_[histogramType]); + } + return res_hist; +} + +std::string StatisticsImpl::getHistogramString(uint32_t histogramType) const { + MutexLock lock(&aggregate_lock_); + return getHistogramImplLocked(histogramType)->ToString(); +} + +void StatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) { + { + MutexLock lock(&aggregate_lock_); + setTickerCountLocked(tickerType, count); + } + if (stats_ && tickerType < TICKER_ENUM_MAX) { + stats_->setTickerCount(tickerType, count); + } +} + +void StatisticsImpl::setTickerCountLocked(uint32_t tickerType, uint64_t count) { + assert(tickerType < TICKER_ENUM_MAX); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + if (core_idx == 0) { + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = count; + } else { + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType] = 0; + } + } +} + +uint64_t StatisticsImpl::getAndResetTickerCount(uint32_t tickerType) { + uint64_t sum = 0; + { + MutexLock lock(&aggregate_lock_); + assert(tickerType < TICKER_ENUM_MAX); + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + sum += + per_core_stats_.AccessAtCore(core_idx)->tickers_[tickerType].exchange( + 0, std::memory_order_relaxed); + } + } + if (stats_ && tickerType < TICKER_ENUM_MAX) { + stats_->setTickerCount(tickerType, 0); + } + return sum; +} + +void StatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) { + assert(tickerType < TICKER_ENUM_MAX); + per_core_stats_.Access()->tickers_[tickerType].fetch_add( + count, std::memory_order_relaxed); + if (stats_ && tickerType < TICKER_ENUM_MAX) { + stats_->recordTick(tickerType, count); + } +} + +void StatisticsImpl::recordInHistogram(uint32_t histogramType, uint64_t value) { + assert(histogramType < HISTOGRAM_ENUM_MAX); + if (get_stats_level() <= StatsLevel::kExceptHistogramOrTimers) { + return; + } + per_core_stats_.Access()->histograms_[histogramType].Add(value); + if (stats_ && histogramType < HISTOGRAM_ENUM_MAX) { + stats_->recordInHistogram(histogramType, value); + } +} + +Status StatisticsImpl::Reset() { + MutexLock lock(&aggregate_lock_); + for (uint32_t i = 0; i < TICKER_ENUM_MAX; ++i) { + setTickerCountLocked(i, 0); + } + for (uint32_t i = 0; i < HISTOGRAM_ENUM_MAX; ++i) { + for (size_t core_idx = 0; core_idx < per_core_stats_.Size(); ++core_idx) { + per_core_stats_.AccessAtCore(core_idx)->histograms_[i].Clear(); + } + } + return Status::OK(); +} + +namespace { + +// a buffer size used for temp string buffers +const int kTmpStrBufferSize = 200; + +} // namespace + +std::string StatisticsImpl::ToString() const { + MutexLock lock(&aggregate_lock_); + std::string res; + res.reserve(20000); + for (const auto& t : TickersNameMap) { + assert(t.first < TICKER_ENUM_MAX); + char buffer[kTmpStrBufferSize]; + snprintf(buffer, kTmpStrBufferSize, "%s COUNT : %" PRIu64 "\n", + t.second.c_str(), getTickerCountLocked(t.first)); + res.append(buffer); + } + for (const auto& h : HistogramsNameMap) { + assert(h.first < HISTOGRAM_ENUM_MAX); + char buffer[kTmpStrBufferSize]; + HistogramData hData; + getHistogramImplLocked(h.first)->Data(&hData); + // don't handle failures - buffer should always be big enough and arguments + // should be provided correctly + int ret = + snprintf(buffer, kTmpStrBufferSize, + "%s P50 : %f P95 : %f P99 : %f P100 : %f COUNT : %" PRIu64 + " SUM : %" PRIu64 "\n", + h.second.c_str(), hData.median, hData.percentile95, + hData.percentile99, hData.max, hData.count, hData.sum); + if (ret < 0 || ret >= kTmpStrBufferSize) { + assert(false); + continue; + } + res.append(buffer); + } + res.shrink_to_fit(); + return res; +} + +bool StatisticsImpl::getTickerMap( + std::map<std::string, uint64_t>* stats_map) const { + assert(stats_map); + if (!stats_map) return false; + stats_map->clear(); + MutexLock lock(&aggregate_lock_); + for (const auto& t : TickersNameMap) { + assert(t.first < TICKER_ENUM_MAX); + (*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first); + } + return true; +} + +bool StatisticsImpl::HistEnabledForType(uint32_t type) const { + return type < HISTOGRAM_ENUM_MAX; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/statistics.h b/src/rocksdb/monitoring/statistics.h new file mode 100644 index 000000000..f633aa4ef --- /dev/null +++ b/src/rocksdb/monitoring/statistics.h @@ -0,0 +1,138 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "rocksdb/statistics.h" + +#include <atomic> +#include <map> +#include <string> +#include <vector> + +#include "monitoring/histogram.h" +#include "port/likely.h" +#include "port/port.h" +#include "util/core_local.h" +#include "util/mutexlock.h" + +#ifdef __clang__ +#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__)) +#else +#define ROCKSDB_FIELD_UNUSED +#endif // __clang__ + +#ifndef STRINGIFY +#define STRINGIFY(x) #x +#define TOSTRING(x) STRINGIFY(x) +#endif + +namespace ROCKSDB_NAMESPACE { + +enum TickersInternal : uint32_t { + INTERNAL_TICKER_ENUM_START = TICKER_ENUM_MAX, + INTERNAL_TICKER_ENUM_MAX +}; + +enum HistogramsInternal : uint32_t { + INTERNAL_HISTOGRAM_START = HISTOGRAM_ENUM_MAX, + INTERNAL_HISTOGRAM_ENUM_MAX +}; + +class StatisticsImpl : public Statistics { + public: + StatisticsImpl(std::shared_ptr<Statistics> stats); + virtual ~StatisticsImpl(); + + virtual uint64_t getTickerCount(uint32_t ticker_type) const override; + virtual void histogramData(uint32_t histogram_type, + HistogramData* const data) const override; + std::string getHistogramString(uint32_t histogram_type) const override; + + virtual void setTickerCount(uint32_t ticker_type, uint64_t count) override; + virtual uint64_t getAndResetTickerCount(uint32_t ticker_type) override; + virtual void recordTick(uint32_t ticker_type, uint64_t count) override; + // The function is implemented for now for backward compatibility reason. + // In case a user explictly calls it, for example, they may have a wrapped + // Statistics object, passing the call to recordTick() into here, nothing + // will break. + void measureTime(uint32_t histogramType, uint64_t time) override { + recordInHistogram(histogramType, time); + } + virtual void recordInHistogram(uint32_t histogram_type, + uint64_t value) override; + + virtual Status Reset() override; + virtual std::string ToString() const override; + virtual bool getTickerMap(std::map<std::string, uint64_t>*) const override; + virtual bool HistEnabledForType(uint32_t type) const override; + + private: + // If non-nullptr, forwards updates to the object pointed to by `stats_`. + std::shared_ptr<Statistics> stats_; + // Synchronizes anything that operates across other cores' local data, + // such that operations like Reset() can be performed atomically. + mutable port::Mutex aggregate_lock_; + + // The ticker/histogram data are stored in this structure, which we will store + // per-core. It is cache-aligned, so tickers/histograms belonging to different + // cores can never share the same cache line. + // + // Alignment attributes expand to nothing depending on the platform + struct ALIGN_AS(CACHE_LINE_SIZE) StatisticsData { + std::atomic_uint_fast64_t tickers_[INTERNAL_TICKER_ENUM_MAX] = {{0}}; + HistogramImpl histograms_[INTERNAL_HISTOGRAM_ENUM_MAX]; +#ifndef HAVE_ALIGNED_NEW + char + padding[(CACHE_LINE_SIZE - + (INTERNAL_TICKER_ENUM_MAX * sizeof(std::atomic_uint_fast64_t) + + INTERNAL_HISTOGRAM_ENUM_MAX * sizeof(HistogramImpl)) % + CACHE_LINE_SIZE)] ROCKSDB_FIELD_UNUSED; +#endif + void *operator new(size_t s) { return port::cacheline_aligned_alloc(s); } + void *operator new[](size_t s) { return port::cacheline_aligned_alloc(s); } + void operator delete(void *p) { port::cacheline_aligned_free(p); } + void operator delete[](void *p) { port::cacheline_aligned_free(p); } + }; + + static_assert(sizeof(StatisticsData) % CACHE_LINE_SIZE == 0, "Expected " TOSTRING(CACHE_LINE_SIZE) "-byte aligned"); + + CoreLocalArray<StatisticsData> per_core_stats_; + + uint64_t getTickerCountLocked(uint32_t ticker_type) const; + std::unique_ptr<HistogramImpl> getHistogramImplLocked( + uint32_t histogram_type) const; + void setTickerCountLocked(uint32_t ticker_type, uint64_t count); +}; + +// Utility functions +inline void RecordInHistogram(Statistics* statistics, uint32_t histogram_type, + uint64_t value) { + if (statistics) { + statistics->recordInHistogram(histogram_type, value); + } +} + +inline void RecordTimeToHistogram(Statistics* statistics, + uint32_t histogram_type, uint64_t value) { + if (statistics) { + statistics->reportTimeToHistogram(histogram_type, value); + } +} + +inline void RecordTick(Statistics* statistics, uint32_t ticker_type, + uint64_t count = 1) { + if (statistics) { + statistics->recordTick(ticker_type, count); + } +} + +inline void SetTickerCount(Statistics* statistics, uint32_t ticker_type, + uint64_t count) { + if (statistics) { + statistics->setTickerCount(ticker_type, count); + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/statistics_test.cc b/src/rocksdb/monitoring/statistics_test.cc new file mode 100644 index 000000000..e497afcbe --- /dev/null +++ b/src/rocksdb/monitoring/statistics_test.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include "port/stack_trace.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +#include "rocksdb/statistics.h" + +namespace ROCKSDB_NAMESPACE { + +class StatisticsTest : public testing::Test {}; + +// Sanity check to make sure that contents and order of TickersNameMap +// match Tickers enum +TEST_F(StatisticsTest, SanityTickers) { + EXPECT_EQ(static_cast<size_t>(Tickers::TICKER_ENUM_MAX), + TickersNameMap.size()); + + for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) { + auto pair = TickersNameMap[static_cast<size_t>(t)]; + ASSERT_EQ(pair.first, t) << "Miss match at " << pair.second; + } +} + +// Sanity check to make sure that contents and order of HistogramsNameMap +// match Tickers enum +TEST_F(StatisticsTest, SanityHistograms) { + EXPECT_EQ(static_cast<size_t>(Histograms::HISTOGRAM_ENUM_MAX), + HistogramsNameMap.size()); + + for (uint32_t h = 0; h < Histograms::HISTOGRAM_ENUM_MAX; h++) { + auto pair = HistogramsNameMap[static_cast<size_t>(h)]; + ASSERT_EQ(pair.first, h) << "Miss match at " << pair.second; + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/monitoring/stats_history_test.cc b/src/rocksdb/monitoring/stats_history_test.cc new file mode 100644 index 000000000..56b8dc4f5 --- /dev/null +++ b/src/rocksdb/monitoring/stats_history_test.cc @@ -0,0 +1,653 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include <limits> +#include <string> +#include <unordered_map> + +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" +#include "monitoring/persistent_stats_history.h" +#include "options/options_helper.h" +#include "port/stack_trace.h" +#include "rocksdb/cache.h" +#include "rocksdb/convenience.h" +#include "rocksdb/rate_limiter.h" +#include "rocksdb/stats_history.h" +#include "test_util/sync_point.h" +#include "test_util/testutil.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +class StatsHistoryTest : public DBTestBase { + public: + StatsHistoryTest() : DBTestBase("/stats_history_test") {} +}; +#ifndef ROCKSDB_LITE + +TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { + Options options; + options.create_if_missing = true; + options.stats_dump_period_sec = 5; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + int counter = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast<uint64_t*>(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + + // Test cacel job through SetOptions + ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}})); + int old_val = counter; + for (int i = 6; i < 20; ++i) { + dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); }); + } + ASSERT_EQ(counter, old_val); + Close(); +} + +// Test persistent stats background thread scheduling and cancelling +TEST_F(StatsHistoryTest, StatsPersistScheduling) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast<uint64_t*>(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + int counter = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + + // Test cacel job through SetOptions + ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled()); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled()); + Close(); +} + +// Test enabling persistent stats for the first time +TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 0; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast<uint64_t*>(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000; + } + }); +#endif // OS_MACOSX && !NDEBUG + int counter = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}})); + ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + ASSERT_GE(counter, 1); + Close(); +} + +// TODO(Zhongyi): Move persistent stats related tests to a separate file +TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast<uint64_t*>(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + int mock_time = 1; + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + std::unique_ptr<StatsHistoryIterator> stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 6 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + // disabled stats snapshots + ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); + size_t stats_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5); + stats_count += stats_map.size(); + } + ASSERT_GT(stats_count, 0); + // Wait a bit and verify no more stats are found + for (mock_time = 6; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_new = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + stats_count_new += stats_iter->GetStatsMap().size(); + } + ASSERT_EQ(stats_count_new, stats_count); + Close(); +} + +TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { + Options options; + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.stats_persist_period_sec = 1; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); +#if defined(OS_MACOSX) && !defined(NDEBUG) + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + uint64_t time_us = *reinterpret_cast<uint64_t*>(arg); + if (time_us < mock_env->RealNowMicros()) { + *reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); +#endif // OS_MACOSX && !NDEBUG + + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // some random operation to populate statistics + ASSERT_OK(Delete("foo")); + ASSERT_OK(Put("sol", "sol")); + ASSERT_OK(Put("epic", "epic")); + ASSERT_OK(Put("ltd", "ltd")); + ASSERT_EQ("sol", Get("sol")); + ASSERT_EQ("epic", Get("epic")); + ASSERT_EQ("ltd", Get("ltd")); + Iterator* iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + ASSERT_OK(Delete("sol")); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + int mock_time = 1; + // Wait for stats persist to finish + for (; mock_time < 5; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + + // second round of ops + ASSERT_OK(Put("saigon", "saigon")); + ASSERT_OK(Put("noodle talk", "noodle talk")); + ASSERT_OK(Put("ping bistro", "ping bistro")); + iterator = db_->NewIterator(ReadOptions()); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_TRUE(iterator->key() == iterator->value()); + } + delete iterator; + ASSERT_OK(Flush()); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + for (; mock_time < 10; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + std::unique_ptr<StatsHistoryIterator> stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 10 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count = 0; + int slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count += stats_map.size(); + } + size_t stats_history_size = dbfull()->TEST_EstimateInMemoryStatsHistorySize(); + ASSERT_GE(slice_count, 9); + ASSERT_GE(stats_history_size, 12000); + // capping memory cost at 12000 bytes since one slice is around 10000~12000 + ASSERT_OK(dbfull()->SetDBOptions({{"stats_history_buffer_size", "12000"}})); + ASSERT_EQ(12000, dbfull()->GetDBOptions().stats_history_buffer_size); + // Wait for stats persist to finish + for (; mock_time < 20; ++mock_time) { + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(mock_time); }); + } + db_->GetStatsHistory(0 /*start_time*/, 20 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_reopen = 0; + slice_count = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + stats_count_reopen += stats_map.size(); + } + size_t stats_history_size_reopen = + dbfull()->TEST_EstimateInMemoryStatsHistorySize(); + // only one slice can fit under the new stats_history_buffer_size + ASSERT_LT(slice_count, 2); + ASSERT_TRUE(stats_history_size_reopen < 12000 && + stats_history_size_reopen > 0); + ASSERT_TRUE(stats_count_reopen < stats_count && stats_count_reopen > 0); + Close(); + // TODO: may also want to verify stats timestamp to make sure we are purging + // the correct stats snapshot +} + +int countkeys(Iterator* iter) { + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + count++; + } + return count; +} + +TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ(Get("foo"), "bar"); + + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + auto iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count1 = countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(10); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count2 = countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(15); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count3 = countkeys(iter); + delete iter; + ASSERT_GE(key_count2, key_count1); + ASSERT_GE(key_count3, key_count2); + ASSERT_EQ(key_count3 - key_count2, key_count2 - key_count1); + std::unique_ptr<StatsHistoryIterator> stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count = 0; + int slice_count = 0; + int non_zero_count = 0; + for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) { + slice_count++; + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + for (auto& stat : stats_map) { + if (stat.second != 0) { + non_zero_count++; + } + } + stats_count += stats_map.size(); + } + ASSERT_EQ(slice_count, 3); + // 2 extra keys for format version + ASSERT_EQ(stats_count, key_count3 - 2); + // verify reopen will not cause data loss + ReopenWithColumnFamilies({"default", "pikachu"}, options); + db_->GetStatsHistory(0 /*start_time*/, 16 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + size_t stats_count_reopen = 0; + int slice_count_reopen = 0; + int non_zero_count_recover = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + slice_count_reopen++; + auto stats_map = stats_iter->GetStatsMap(); + for (auto& stat : stats_map) { + if (stat.second != 0) { + non_zero_count_recover++; + } + } + stats_count_reopen += stats_map.size(); + } + ASSERT_EQ(non_zero_count, non_zero_count_recover); + ASSERT_EQ(slice_count, slice_count_reopen); + ASSERT_EQ(stats_count, stats_count_reopen); + Close(); +} + +// Test persisted stats matches the value found in options.statistics and +// the stats value retains after DB reopen +TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + std::map<std::string, uint64_t> stats_map_before; + ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_before)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ASSERT_OK(Put("foo", "bar")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ(Get("foo"), "bar"); + + // Wait for stats persist to finish + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + auto iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(10); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(15); }); + iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + countkeys(iter); + delete iter; + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(20); }); + + std::map<std::string, uint64_t> stats_map_after; + ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_after)); + std::unique_ptr<StatsHistoryIterator> stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + std::string sample = "rocksdb.num.iterator.deleted"; + uint64_t recovered_value = 0; + for (int i = 1; stats_iter->Valid(); stats_iter->Next(), ++i) { + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + recovered_value += stat.second; + } + } + } + ASSERT_EQ(recovered_value, stats_map_after[sample]); + + // test stats value retains after recovery + ReopenWithColumnFamilies({"default", "pikachu"}, options); + db_->GetStatsHistory(0 /*start_time*/, 21 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + uint64_t new_recovered_value = 0; + for (int i = 1; stats_iter->Valid(); stats_iter->Next(), i++) { + auto stats_map = stats_iter->GetStatsMap(); + ASSERT_EQ(stats_iter->GetStatsTime(), 5 * i); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + new_recovered_value += stat.second; + } + } + } + ASSERT_EQ(recovered_value, new_recovered_value); + + // TODO(Zhongyi): also add test to read raw values from disk and verify + // correctness + Close(); +} + +// TODO(Zhongyi): add test for different format versions + +TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { + Options options; + options.create_if_missing = true; + options.stats_persist_period_sec = 5; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + ASSERT_OK(TryReopen(options)); + CreateColumnFamilies({"one", "two", "three"}, options); + ASSERT_OK(Put(1, "foo", "bar")); + ReopenWithColumnFamilies({"default", "one", "two", "three"}, options); + ASSERT_EQ(Get(2, "foo"), "bar"); + CreateColumnFamilies({"four"}, options); + ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options); + ASSERT_EQ(Get(2, "foo"), "bar"); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + auto iter = + db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); + int key_count = countkeys(iter); + delete iter; + ASSERT_GE(key_count, 0); + uint64_t num_write_wal = 0; + std::string sample = "rocksdb.write.wal"; + std::unique_ptr<StatsHistoryIterator> stats_iter; + db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + num_write_wal += stat.second; + } + } + } + stats_iter.reset(); + ASSERT_EQ(num_write_wal, 2); + + options.persist_stats_to_disk = false; + ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options); + int cf_count = 0; + for (auto cfd : *dbfull()->versions_->GetColumnFamilySet()) { + (void)cfd; + cf_count++; + } + // persistent stats cf will be implicitly opened even if + // persist_stats_to_disk is false + ASSERT_EQ(cf_count, 6); + ASSERT_EQ(Get(2, "foo"), "bar"); + + // attempt to create column family using same name, should fail + ColumnFamilyOptions cf_opts(options); + ColumnFamilyHandle* handle; + ASSERT_NOK(db_->CreateColumnFamily(cf_opts, kPersistentStatsColumnFamilyName, + &handle)); + + options.persist_stats_to_disk = true; + ReopenWithColumnFamilies({"default", "one", "two", "three", "four"}, options); + ASSERT_NOK(db_->CreateColumnFamily(cf_opts, kPersistentStatsColumnFamilyName, + &handle)); + // verify stats is not affected by prior failed CF creation + db_->GetStatsHistory(0 /*start_time*/, 5 /*end_time*/, &stats_iter); + ASSERT_TRUE(stats_iter != nullptr); + num_write_wal = 0; + for (; stats_iter->Valid(); stats_iter->Next()) { + auto stats_map = stats_iter->GetStatsMap(); + for (const auto& stat : stats_map) { + if (sample.compare(stat.first) == 0) { + num_write_wal += stat.second; + } + } + } + ASSERT_EQ(num_write_wal, 2); + + Close(); + Destroy(options); +} + +TEST_F(StatsHistoryTest, PersistentStatsReadOnly) { + ASSERT_OK(Put("bar", "v2")); + Close(); + + auto options = CurrentOptions(); + options.stats_persist_period_sec = 5; + options.persist_stats_to_disk = true; + assert(options.env == env_); + ASSERT_OK(ReadOnlyReopen(options)); + ASSERT_EQ("v2", Get("bar")); + Close(); + + // Reopen and flush memtable. + ASSERT_OK(TryReopen(options)); + Flush(); + Close(); + // Now check keys in read only mode. + ASSERT_OK(ReadOnlyReopen(options)); +} + +TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { + Options options; + options.create_if_missing = true; + options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb + options.stats_persist_period_sec = 5; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; + mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ColumnFamilyData* cfd_default = + static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily()) + ->cfd(); + ColumnFamilyData* cfd_stats = static_cast<ColumnFamilyHandleImpl*>( + dbfull()->PersistentStatsColumnFamily()) + ->cfd(); + ColumnFamilyData* cfd_test = + static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd(); + + ASSERT_OK(Put("foo", "v0")); + ASSERT_OK(Put("bar", "v0")); + ASSERT_EQ("v0", Get("bar")); + ASSERT_EQ("v0", Get("foo")); + ASSERT_OK(Put(1, "Eevee", "v0")); + ASSERT_EQ("v0", Get(1, "Eevee")); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + // writing to all three cf, flush default cf + // LogNumbers: default: 14, stats: 4, pikachu: 4 + ASSERT_OK(Flush()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo1", "v1")); + ASSERT_OK(Put("bar1", "v1")); + ASSERT_EQ("v1", Get("bar1")); + ASSERT_EQ("v1", Get("foo1")); + ASSERT_OK(Put(1, "Vaporeon", "v1")); + ASSERT_EQ("v1", Get(1, "Vaporeon")); + // writing to default and test cf, flush test cf + // LogNumbers: default: 14, stats: 16, pikachu: 16 + ASSERT_OK(Flush(1)); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_GT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo2", "v2")); + ASSERT_OK(Put("bar2", "v2")); + ASSERT_EQ("v2", Get("bar2")); + ASSERT_EQ("v2", Get("foo2")); + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(10); }); + // writing to default and stats cf, flushing default cf + // LogNumbers: default: 19, stats: 19, pikachu: 19 + ASSERT_OK(Flush()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo3", "v3")); + ASSERT_OK(Put("bar3", "v3")); + ASSERT_EQ("v3", Get("bar3")); + ASSERT_EQ("v3", Get("foo3")); + ASSERT_OK(Put(1, "Jolteon", "v3")); + ASSERT_EQ("v3", Get(1, "Jolteon")); + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(15); }); + // writing to all three cf, flushing test cf + // LogNumbers: default: 19, stats: 19, pikachu: 22 + ASSERT_OK(Flush(1)); + ASSERT_LT(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + Close(); +} + +#endif // !ROCKSDB_LITE +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/monitoring/thread_status_impl.cc b/src/rocksdb/monitoring/thread_status_impl.cc new file mode 100644 index 000000000..9619dfd81 --- /dev/null +++ b/src/rocksdb/monitoring/thread_status_impl.cc @@ -0,0 +1,163 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include <sstream> + +#include "rocksdb/env.h" +#include "rocksdb/thread_status.h" +#include "util/string_util.h" +#include "util/thread_operation.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef ROCKSDB_USING_THREAD_STATUS +std::string ThreadStatus::GetThreadTypeName( + ThreadStatus::ThreadType thread_type) { + switch (thread_type) { + case ThreadStatus::ThreadType::HIGH_PRIORITY: + return "High Pri"; + case ThreadStatus::ThreadType::LOW_PRIORITY: + return "Low Pri"; + case ThreadStatus::ThreadType::USER: + return "User"; + case ThreadStatus::ThreadType::BOTTOM_PRIORITY: + return "Bottom Pri"; + case ThreadStatus::ThreadType::NUM_THREAD_TYPES: + assert(false); + } + return "Unknown"; +} + +const std::string& ThreadStatus::GetOperationName( + ThreadStatus::OperationType op_type) { + if (op_type < 0 || op_type >= NUM_OP_TYPES) { + return global_operation_table[OP_UNKNOWN].name; + } + return global_operation_table[op_type].name; +} + +const std::string& ThreadStatus::GetOperationStageName( + ThreadStatus::OperationStage stage) { + if (stage < 0 || stage >= NUM_OP_STAGES) { + return global_op_stage_table[STAGE_UNKNOWN].name; + } + return global_op_stage_table[stage].name; +} + +const std::string& ThreadStatus::GetStateName( + ThreadStatus::StateType state_type) { + if (state_type < 0 || state_type >= NUM_STATE_TYPES) { + return global_state_table[STATE_UNKNOWN].name; + } + return global_state_table[state_type].name; +} + +const std::string ThreadStatus::MicrosToString(uint64_t micros) { + if (micros == 0) { + return ""; + } + const int kBufferLen = 100; + char buffer[kBufferLen]; + AppendHumanMicros(micros, buffer, kBufferLen, false); + return std::string(buffer); +} + +const std::string& ThreadStatus::GetOperationPropertyName( + ThreadStatus::OperationType op_type, int i) { + static const std::string empty_str = ""; + switch (op_type) { + case ThreadStatus::OP_COMPACTION: + if (i >= NUM_COMPACTION_PROPERTIES) { + return empty_str; + } + return compaction_operation_properties[i].name; + case ThreadStatus::OP_FLUSH: + if (i >= NUM_FLUSH_PROPERTIES) { + return empty_str; + } + return flush_operation_properties[i].name; + default: + return empty_str; + } +} + +std::map<std::string, uint64_t> ThreadStatus::InterpretOperationProperties( + ThreadStatus::OperationType op_type, const uint64_t* op_properties) { + int num_properties; + switch (op_type) { + case OP_COMPACTION: + num_properties = NUM_COMPACTION_PROPERTIES; + break; + case OP_FLUSH: + num_properties = NUM_FLUSH_PROPERTIES; + break; + default: + num_properties = 0; + } + + std::map<std::string, uint64_t> property_map; + for (int i = 0; i < num_properties; ++i) { + if (op_type == OP_COMPACTION && i == COMPACTION_INPUT_OUTPUT_LEVEL) { + property_map.insert({"BaseInputLevel", op_properties[i] >> 32}); + property_map.insert( + {"OutputLevel", op_properties[i] % (uint64_t(1) << 32U)}); + } else if (op_type == OP_COMPACTION && i == COMPACTION_PROP_FLAGS) { + property_map.insert({"IsManual", ((op_properties[i] & 2) >> 1)}); + property_map.insert({"IsDeletion", ((op_properties[i] & 4) >> 2)}); + property_map.insert({"IsTrivialMove", ((op_properties[i] & 8) >> 3)}); + } else { + property_map.insert( + {GetOperationPropertyName(op_type, i), op_properties[i]}); + } + } + return property_map; +} + +#else + +std::string ThreadStatus::GetThreadTypeName( + ThreadStatus::ThreadType /*thread_type*/) { + static std::string dummy_str = ""; + return dummy_str; +} + +const std::string& ThreadStatus::GetOperationName( + ThreadStatus::OperationType /*op_type*/) { + static std::string dummy_str = ""; + return dummy_str; +} + +const std::string& ThreadStatus::GetOperationStageName( + ThreadStatus::OperationStage /*stage*/) { + static std::string dummy_str = ""; + return dummy_str; +} + +const std::string& ThreadStatus::GetStateName( + ThreadStatus::StateType /*state_type*/) { + static std::string dummy_str = ""; + return dummy_str; +} + +const std::string ThreadStatus::MicrosToString(uint64_t /*op_elapsed_time*/) { + static std::string dummy_str = ""; + return dummy_str; +} + +const std::string& ThreadStatus::GetOperationPropertyName( + ThreadStatus::OperationType /*op_type*/, int /*i*/) { + static std::string dummy_str = ""; + return dummy_str; +} + +std::map<std::string, uint64_t> ThreadStatus::InterpretOperationProperties( + ThreadStatus::OperationType /*op_type*/, + const uint64_t* /*op_properties*/) { + return std::map<std::string, uint64_t>(); +} + +#endif // ROCKSDB_USING_THREAD_STATUS +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/thread_status_updater.cc b/src/rocksdb/monitoring/thread_status_updater.cc new file mode 100644 index 000000000..7e4b299a8 --- /dev/null +++ b/src/rocksdb/monitoring/thread_status_updater.cc @@ -0,0 +1,314 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "monitoring/thread_status_updater.h" +#include <memory> +#include "port/likely.h" +#include "rocksdb/env.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef ROCKSDB_USING_THREAD_STATUS + +__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; + +void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype, + uint64_t thread_id) { + if (UNLIKELY(thread_status_data_ == nullptr)) { + thread_status_data_ = new ThreadStatusData(); + thread_status_data_->thread_type = ttype; + thread_status_data_->thread_id = thread_id; + std::lock_guard<std::mutex> lck(thread_list_mutex_); + thread_data_set_.insert(thread_status_data_); + } + + ClearThreadOperationProperties(); +} + +void ThreadStatusUpdater::UnregisterThread() { + if (thread_status_data_ != nullptr) { + std::lock_guard<std::mutex> lck(thread_list_mutex_); + thread_data_set_.erase(thread_status_data_); + delete thread_status_data_; + thread_status_data_ = nullptr; + } +} + +void ThreadStatusUpdater::ResetThreadStatus() { + ClearThreadState(); + ClearThreadOperation(); + SetColumnFamilyInfoKey(nullptr); +} + +void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) { + auto* data = Get(); + if (data == nullptr) { + return; + } + // set the tracking flag based on whether cf_key is non-null or not. + // If enable_thread_tracking is set to false, the input cf_key + // would be nullptr. + data->enable_tracking = (cf_key != nullptr); + data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed); +} + +const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return nullptr; + } + return data->cf_key.load(std::memory_order_relaxed); +} + +void ThreadStatusUpdater::SetThreadOperation( + const ThreadStatus::OperationType type) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + // NOTE: Our practice here is to set all the thread operation properties + // and stage before we set thread operation, and thread operation + // will be set in std::memory_order_release. This is to ensure + // whenever a thread operation is not OP_UNKNOWN, we will always + // have a consistent information on its properties. + data->operation_type.store(type, std::memory_order_release); + if (type == ThreadStatus::OP_UNKNOWN) { + data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, + std::memory_order_relaxed); + ClearThreadOperationProperties(); + } +} + +void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + data->op_properties[i].store(value, std::memory_order_relaxed); +} + +void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i, + uint64_t delta) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + data->op_properties[i].fetch_add(delta, std::memory_order_relaxed); +} + +void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + data->op_start_time.store(start_time, std::memory_order_relaxed); +} + +void ThreadStatusUpdater::ClearThreadOperation() { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, + std::memory_order_relaxed); + data->operation_type.store(ThreadStatus::OP_UNKNOWN, + std::memory_order_relaxed); + ClearThreadOperationProperties(); +} + +void ThreadStatusUpdater::ClearThreadOperationProperties() { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { + data->op_properties[i].store(0, std::memory_order_relaxed); + } +} + +ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( + ThreadStatus::OperationStage stage) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return ThreadStatus::STAGE_UNKNOWN; + } + return data->operation_stage.exchange(stage, std::memory_order_relaxed); +} + +void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + data->state_type.store(type, std::memory_order_relaxed); +} + +void ThreadStatusUpdater::ClearThreadState() { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { + return; + } + data->state_type.store(ThreadStatus::STATE_UNKNOWN, + std::memory_order_relaxed); +} + +Status ThreadStatusUpdater::GetThreadList( + std::vector<ThreadStatus>* thread_list) { + thread_list->clear(); + std::vector<std::shared_ptr<ThreadStatusData>> valid_list; + uint64_t now_micros = Env::Default()->NowMicros(); + + std::lock_guard<std::mutex> lck(thread_list_mutex_); + for (auto* thread_data : thread_data_set_) { + assert(thread_data); + auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed); + auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed); + // Since any change to cf_info_map requires thread_list_mutex, + // which is currently held by GetThreadList(), here we can safely + // use "memory_order_relaxed" to load the cf_key. + auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed); + + ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN; + ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN; + ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN; + uint64_t op_elapsed_micros = 0; + uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0}; + + auto iter = cf_info_map_.find(cf_key); + if (iter != cf_info_map_.end()) { + op_type = thread_data->operation_type.load(std::memory_order_acquire); + // display lower-level info only when higher-level info is available. + if (op_type != ThreadStatus::OP_UNKNOWN) { + op_elapsed_micros = now_micros - thread_data->op_start_time.load( + std::memory_order_relaxed); + op_stage = thread_data->operation_stage.load(std::memory_order_relaxed); + state_type = thread_data->state_type.load(std::memory_order_relaxed); + for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { + op_props[i] = + thread_data->op_properties[i].load(std::memory_order_relaxed); + } + } + } + + thread_list->emplace_back( + thread_id, thread_type, + iter != cf_info_map_.end() ? iter->second.db_name : "", + iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type, + op_elapsed_micros, op_stage, op_props, state_type); + } + + return Status::OK(); +} + +ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() { + if (thread_status_data_ == nullptr) { + return nullptr; + } + if (!thread_status_data_->enable_tracking) { + assert(thread_status_data_->cf_key.load(std::memory_order_relaxed) == + nullptr); + return nullptr; + } + return thread_status_data_; +} + +void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key, + const std::string& db_name, + const void* cf_key, + const std::string& cf_name) { + // Acquiring same lock as GetThreadList() to guarantee + // a consistent view of global column family table (cf_info_map). + std::lock_guard<std::mutex> lck(thread_list_mutex_); + + cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key), + std::make_tuple(db_key, db_name, cf_name)); + db_key_map_[db_key].insert(cf_key); +} + +void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { + // Acquiring same lock as GetThreadList() to guarantee + // a consistent view of global column family table (cf_info_map). + std::lock_guard<std::mutex> lck(thread_list_mutex_); + + auto cf_pair = cf_info_map_.find(cf_key); + if (cf_pair != cf_info_map_.end()) { + // Remove its entry from db_key_map_ by the following steps: + // 1. Obtain the entry in db_key_map_ whose set contains cf_key + // 2. Remove it from the set. + ConstantColumnFamilyInfo& cf_info = cf_pair->second; + auto db_pair = db_key_map_.find(cf_info.db_key); + assert(db_pair != db_key_map_.end()); + size_t result __attribute__((__unused__)); + result = db_pair->second.erase(cf_key); + assert(result); + cf_info_map_.erase(cf_pair); + } +} + +void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { + // Acquiring same lock as GetThreadList() to guarantee + // a consistent view of global column family table (cf_info_map). + std::lock_guard<std::mutex> lck(thread_list_mutex_); + auto db_pair = db_key_map_.find(db_key); + if (UNLIKELY(db_pair == db_key_map_.end())) { + // In some occasional cases such as DB::Open fails, we won't + // register ColumnFamilyInfo for a db. + return; + } + + for (auto cf_key : db_pair->second) { + auto cf_pair = cf_info_map_.find(cf_key); + if (cf_pair != cf_info_map_.end()) { + cf_info_map_.erase(cf_pair); + } + } + db_key_map_.erase(db_key); +} + +#else + +void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/, + uint64_t /*thread_id*/) {} + +void ThreadStatusUpdater::UnregisterThread() {} + +void ThreadStatusUpdater::ResetThreadStatus() {} + +void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {} + +void ThreadStatusUpdater::SetThreadOperation( + const ThreadStatus::OperationType /*type*/) {} + +void ThreadStatusUpdater::ClearThreadOperation() {} + +void ThreadStatusUpdater::SetThreadState( + const ThreadStatus::StateType /*type*/) {} + +void ThreadStatusUpdater::ClearThreadState() {} + +Status ThreadStatusUpdater::GetThreadList( + std::vector<ThreadStatus>* /*thread_list*/) { + return Status::NotSupported( + "GetThreadList is not supported in the current running environment."); +} + +void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/, + const std::string& /*db_name*/, + const void* /*cf_key*/, + const std::string& /*cf_name*/) {} + +void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {} + +void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {} + +void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/, + uint64_t /*value*/) {} + +void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/, + uint64_t /*delta*/) {} + +#endif // ROCKSDB_USING_THREAD_STATUS +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/thread_status_updater.h b/src/rocksdb/monitoring/thread_status_updater.h new file mode 100644 index 000000000..ac8c0f35d --- /dev/null +++ b/src/rocksdb/monitoring/thread_status_updater.h @@ -0,0 +1,233 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// The implementation of ThreadStatus. +// +// Note that we make get and set access to ThreadStatusData lockless. +// As a result, ThreadStatusData as a whole is not atomic. However, +// we guarantee consistent ThreadStatusData all the time whenever +// user call GetThreadList(). This consistency guarantee is done +// by having the following constraint in the internal implementation +// of set and get order: +// +// 1. When reset any information in ThreadStatusData, always start from +// clearing up the lower-level information first. +// 2. When setting any information in ThreadStatusData, always start from +// setting the higher-level information. +// 3. When returning ThreadStatusData to the user, fields are fetched from +// higher-level to lower-level. In addition, where there's a nullptr +// in one field, then all fields that has lower-level than that field +// should be ignored. +// +// The high to low level information would be: +// thread_id > thread_type > db > cf > operation > state +// +// This means user might not always get full information, but whenever +// returned by the GetThreadList() is guaranteed to be consistent. +#pragma once +#include <atomic> +#include <list> +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "rocksdb/status.h" +#include "rocksdb/thread_status.h" +#include "port/port.h" +#include "util/thread_operation.h" + +namespace ROCKSDB_NAMESPACE { + +class ColumnFamilyHandle; + +// The structure that keeps constant information about a column family. +struct ConstantColumnFamilyInfo { +#ifdef ROCKSDB_USING_THREAD_STATUS + public: + ConstantColumnFamilyInfo( + const void* _db_key, + const std::string& _db_name, + const std::string& _cf_name) : + db_key(_db_key), db_name(_db_name), cf_name(_cf_name) {} + const void* db_key; + const std::string db_name; + const std::string cf_name; +#endif // ROCKSDB_USING_THREAD_STATUS +}; + +// the internal data-structure that is used to reflect the current +// status of a thread using a set of atomic pointers. +struct ThreadStatusData { +#ifdef ROCKSDB_USING_THREAD_STATUS + explicit ThreadStatusData() : enable_tracking(false) { + thread_id.store(0); + thread_type.store(ThreadStatus::USER); + cf_key.store(nullptr); + operation_type.store(ThreadStatus::OP_UNKNOWN); + op_start_time.store(0); + state_type.store(ThreadStatus::STATE_UNKNOWN); + } + + // A flag to indicate whether the thread tracking is enabled + // in the current thread. This value will be updated based on whether + // the associated Options::enable_thread_tracking is set to true + // in ThreadStatusUtil::SetColumnFamily(). + // + // If set to false, then SetThreadOperation and SetThreadState + // will be no-op. + bool enable_tracking; + + std::atomic<uint64_t> thread_id; + std::atomic<ThreadStatus::ThreadType> thread_type; + std::atomic<void*> cf_key; + std::atomic<ThreadStatus::OperationType> operation_type; + std::atomic<uint64_t> op_start_time; + std::atomic<ThreadStatus::OperationStage> operation_stage; + std::atomic<uint64_t> op_properties[ThreadStatus::kNumOperationProperties]; + std::atomic<ThreadStatus::StateType> state_type; +#endif // ROCKSDB_USING_THREAD_STATUS +}; + +// The class that stores and updates the status of the current thread +// using a thread-local ThreadStatusData. +// +// In most of the case, you should use ThreadStatusUtil to update +// the status of the current thread instead of using ThreadSatusUpdater +// directly. +// +// @see ThreadStatusUtil +class ThreadStatusUpdater { + public: + ThreadStatusUpdater() {} + + // Releases all ThreadStatusData of all active threads. + virtual ~ThreadStatusUpdater() {} + + // Unregister the current thread. + void UnregisterThread(); + + // Reset the status of the current thread. This includes resetting + // ColumnFamilyInfoKey, ThreadOperation, and ThreadState. + void ResetThreadStatus(); + + // Set the id of the current thread. + void SetThreadID(uint64_t thread_id); + + // Register the current thread for tracking. + void RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id); + + // Update the column-family info of the current thread by setting + // its thread-local pointer of ThreadStateInfo to the correct entry. + void SetColumnFamilyInfoKey(const void* cf_key); + + // returns the column family info key. + const void* GetColumnFamilyInfoKey(); + + // Update the thread operation of the current thread. + void SetThreadOperation(const ThreadStatus::OperationType type); + + // The start time of the current thread operation. It is in the format + // of micro-seconds since some fixed point in time. + void SetOperationStartTime(const uint64_t start_time); + + // Set the "i"th property of the current operation. + // + // NOTE: Our practice here is to set all the thread operation properties + // and stage before we set thread operation, and thread operation + // will be set in std::memory_order_release. This is to ensure + // whenever a thread operation is not OP_UNKNOWN, we will always + // have a consistent information on its properties. + void SetThreadOperationProperty( + int i, uint64_t value); + + // Increase the "i"th property of the current operation with + // the specified delta. + void IncreaseThreadOperationProperty( + int i, uint64_t delta); + + // Update the thread operation stage of the current thread. + ThreadStatus::OperationStage SetThreadOperationStage( + const ThreadStatus::OperationStage stage); + + // Clear thread operation of the current thread. + void ClearThreadOperation(); + + // Reset all thread-operation-properties to 0. + void ClearThreadOperationProperties(); + + // Update the thread state of the current thread. + void SetThreadState(const ThreadStatus::StateType type); + + // Clear the thread state of the current thread. + void ClearThreadState(); + + // Obtain the status of all active registered threads. + Status GetThreadList( + std::vector<ThreadStatus>* thread_list); + + // Create an entry in the global ColumnFamilyInfo table for the + // specified column family. This function should be called only + // when the current thread does not hold db_mutex. + void NewColumnFamilyInfo( + const void* db_key, const std::string& db_name, + const void* cf_key, const std::string& cf_name); + + // Erase all ConstantColumnFamilyInfo that is associated with the + // specified db instance. This function should be called only when + // the current thread does not hold db_mutex. + void EraseDatabaseInfo(const void* db_key); + + // Erase the ConstantColumnFamilyInfo that is associated with the + // specified ColumnFamilyData. This function should be called only + // when the current thread does not hold db_mutex. + void EraseColumnFamilyInfo(const void* cf_key); + + // Verifies whether the input ColumnFamilyHandles matches + // the information stored in the current cf_info_map. + void TEST_VerifyColumnFamilyInfoMap( + const std::vector<ColumnFamilyHandle*>& handles, + bool check_exist); + + protected: +#ifdef ROCKSDB_USING_THREAD_STATUS + // The thread-local variable for storing thread status. + static __thread ThreadStatusData* thread_status_data_; + + // Returns the pointer to the thread status data only when the + // thread status data is non-null and has enable_tracking == true. + ThreadStatusData* GetLocalThreadStatus(); + + // Directly returns the pointer to thread_status_data_ without + // checking whether enabling_tracking is true of not. + ThreadStatusData* Get() { + return thread_status_data_; + } + + // The mutex that protects cf_info_map and db_key_map. + std::mutex thread_list_mutex_; + + // The current status data of all active threads. + std::unordered_set<ThreadStatusData*> thread_data_set_; + + // A global map that keeps the column family information. It is stored + // globally instead of inside DB is to avoid the situation where DB is + // closing while GetThreadList function already get the pointer to its + // CopnstantColumnFamilyInfo. + std::unordered_map<const void*, ConstantColumnFamilyInfo> cf_info_map_; + + // A db_key to cf_key map that allows erasing elements in cf_info_map + // associated to the same db_key faster. + std::unordered_map< + const void*, std::unordered_set<const void*>> db_key_map_; + +#else + static ThreadStatusData* thread_status_data_; +#endif // ROCKSDB_USING_THREAD_STATUS +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/thread_status_updater_debug.cc b/src/rocksdb/monitoring/thread_status_updater_debug.cc new file mode 100644 index 000000000..5937d91fa --- /dev/null +++ b/src/rocksdb/monitoring/thread_status_updater_debug.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <mutex> + +#include "db/column_family.h" +#include "monitoring/thread_status_updater.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef NDEBUG +#ifdef ROCKSDB_USING_THREAD_STATUS +void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap( + const std::vector<ColumnFamilyHandle*>& handles, bool check_exist) { + std::unique_lock<std::mutex> lock(thread_list_mutex_); + if (check_exist) { + assert(cf_info_map_.size() == handles.size()); + } + for (auto* handle : handles) { + auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd(); + auto iter __attribute__((__unused__)) = cf_info_map_.find(cfd); + if (check_exist) { + assert(iter != cf_info_map_.end()); + assert(iter->second.cf_name == cfd->GetName()); + } else { + assert(iter == cf_info_map_.end()); + } + } +} + +#else + +void ThreadStatusUpdater::TEST_VerifyColumnFamilyInfoMap( + const std::vector<ColumnFamilyHandle*>& /*handles*/, bool /*check_exist*/) { +} + +#endif // ROCKSDB_USING_THREAD_STATUS +#endif // !NDEBUG + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/thread_status_util.cc b/src/rocksdb/monitoring/thread_status_util.cc new file mode 100644 index 000000000..13a79163c --- /dev/null +++ b/src/rocksdb/monitoring/thread_status_util.cc @@ -0,0 +1,206 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "monitoring/thread_status_util.h" + +#include "monitoring/thread_status_updater.h" +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef ROCKSDB_USING_THREAD_STATUS +__thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = + nullptr; +__thread bool ThreadStatusUtil::thread_updater_initialized_ = false; + +void ThreadStatusUtil::RegisterThread(const Env* env, + ThreadStatus::ThreadType thread_type) { + if (!MaybeInitThreadLocalUpdater(env)) { + return; + } + assert(thread_updater_local_cache_); + thread_updater_local_cache_->RegisterThread(thread_type, env->GetThreadID()); +} + +void ThreadStatusUtil::UnregisterThread() { + thread_updater_initialized_ = false; + if (thread_updater_local_cache_ != nullptr) { + thread_updater_local_cache_->UnregisterThread(); + thread_updater_local_cache_ = nullptr; + } +} + +void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* cfd, + const Env* env, + bool enable_thread_tracking) { + if (!MaybeInitThreadLocalUpdater(env)) { + return; + } + assert(thread_updater_local_cache_); + if (cfd != nullptr && enable_thread_tracking) { + thread_updater_local_cache_->SetColumnFamilyInfoKey(cfd); + } else { + // When cfd == nullptr or enable_thread_tracking == false, we set + // ColumnFamilyInfoKey to nullptr, which makes SetThreadOperation + // and SetThreadState become no-op. + thread_updater_local_cache_->SetColumnFamilyInfoKey(nullptr); + } +} + +void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType op) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + if (op != ThreadStatus::OP_UNKNOWN) { + uint64_t current_time = Env::Default()->NowMicros(); + thread_updater_local_cache_->SetOperationStartTime(current_time); + } else { + // TDOO(yhchiang): we could report the time when we set operation to + // OP_UNKNOWN once the whole instrumentation has been done. + thread_updater_local_cache_->SetOperationStartTime(0); + } + thread_updater_local_cache_->SetThreadOperation(op); +} + +ThreadStatus::OperationStage ThreadStatusUtil::SetThreadOperationStage( + ThreadStatus::OperationStage stage) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return ThreadStatus::STAGE_UNKNOWN; + } + + return thread_updater_local_cache_->SetThreadOperationStage(stage); +} + +void ThreadStatusUtil::SetThreadOperationProperty(int code, uint64_t value) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + thread_updater_local_cache_->SetThreadOperationProperty(code, value); +} + +void ThreadStatusUtil::IncreaseThreadOperationProperty(int code, + uint64_t delta) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + thread_updater_local_cache_->IncreaseThreadOperationProperty(code, delta); +} + +void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType state) { + if (thread_updater_local_cache_ == nullptr) { + // thread_updater_local_cache_ must be set in SetColumnFamily + // or other ThreadStatusUtil functions. + return; + } + + thread_updater_local_cache_->SetThreadState(state); +} + +void ThreadStatusUtil::ResetThreadStatus() { + if (thread_updater_local_cache_ == nullptr) { + return; + } + thread_updater_local_cache_->ResetThreadStatus(); +} + +void ThreadStatusUtil::NewColumnFamilyInfo(const DB* db, + const ColumnFamilyData* cfd, + const std::string& cf_name, + const Env* env) { + if (!MaybeInitThreadLocalUpdater(env)) { + return; + } + assert(thread_updater_local_cache_); + if (thread_updater_local_cache_) { + thread_updater_local_cache_->NewColumnFamilyInfo(db, db->GetName(), cfd, + cf_name); + } +} + +void ThreadStatusUtil::EraseColumnFamilyInfo(const ColumnFamilyData* cfd) { + if (thread_updater_local_cache_ == nullptr) { + return; + } + thread_updater_local_cache_->EraseColumnFamilyInfo(cfd); +} + +void ThreadStatusUtil::EraseDatabaseInfo(const DB* db) { + ThreadStatusUpdater* thread_updater = db->GetEnv()->GetThreadStatusUpdater(); + if (thread_updater == nullptr) { + return; + } + thread_updater->EraseDatabaseInfo(db); +} + +bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* env) { + if (!thread_updater_initialized_ && env != nullptr) { + thread_updater_initialized_ = true; + thread_updater_local_cache_ = env->GetThreadStatusUpdater(); + } + return (thread_updater_local_cache_ != nullptr); +} + +AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater( + ThreadStatus::OperationStage stage) { + prev_stage_ = ThreadStatusUtil::SetThreadOperationStage(stage); +} + +AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() { + ThreadStatusUtil::SetThreadOperationStage(prev_stage_); +} + +#else + +ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; +bool ThreadStatusUtil::thread_updater_initialized_ = false; + +bool ThreadStatusUtil::MaybeInitThreadLocalUpdater(const Env* /*env*/) { + return false; +} + +void ThreadStatusUtil::SetColumnFamily(const ColumnFamilyData* /*cfd*/, + const Env* /*env*/, + bool /*enable_thread_tracking*/) {} + +void ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType /*op*/) {} + +void ThreadStatusUtil::SetThreadOperationProperty(int /*code*/, + uint64_t /*value*/) {} + +void ThreadStatusUtil::IncreaseThreadOperationProperty(int /*code*/, + uint64_t /*delta*/) {} + +void ThreadStatusUtil::SetThreadState(ThreadStatus::StateType /*state*/) {} + +void ThreadStatusUtil::NewColumnFamilyInfo(const DB* /*db*/, + const ColumnFamilyData* /*cfd*/, + const std::string& /*cf_name*/, + const Env* /*env*/) {} + +void ThreadStatusUtil::EraseColumnFamilyInfo(const ColumnFamilyData* /*cfd*/) {} + +void ThreadStatusUtil::EraseDatabaseInfo(const DB* /*db*/) {} + +void ThreadStatusUtil::ResetThreadStatus() {} + +AutoThreadOperationStageUpdater::AutoThreadOperationStageUpdater( + ThreadStatus::OperationStage /*stage*/) {} + +AutoThreadOperationStageUpdater::~AutoThreadOperationStageUpdater() {} + +#endif // ROCKSDB_USING_THREAD_STATUS + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/thread_status_util.h b/src/rocksdb/monitoring/thread_status_util.h new file mode 100644 index 000000000..b4d97b0b6 --- /dev/null +++ b/src/rocksdb/monitoring/thread_status_util.h @@ -0,0 +1,134 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <string> + +#include "monitoring/thread_status_updater.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/thread_status.h" + +namespace ROCKSDB_NAMESPACE { + +class ColumnFamilyData; + +// The static utility class for updating thread-local status. +// +// The thread-local status is updated via the thread-local cached +// pointer thread_updater_local_cache_. During each function call, +// when ThreadStatusUtil finds thread_updater_local_cache_ is +// left uninitialized (determined by thread_updater_initialized_), +// it will tries to initialize it using the return value of +// Env::GetThreadStatusUpdater(). When thread_updater_local_cache_ +// is initialized by a non-null pointer, each function call will +// then update the status of the current thread. Otherwise, +// all function calls to ThreadStatusUtil will be no-op. +class ThreadStatusUtil { + public: + // Register the current thread for tracking. + static void RegisterThread( + const Env* env, ThreadStatus::ThreadType thread_type); + + // Unregister the current thread. + static void UnregisterThread(); + + // Create an entry in the global ColumnFamilyInfo table for the + // specified column family. This function should be called only + // when the current thread does not hold db_mutex. + static void NewColumnFamilyInfo(const DB* db, const ColumnFamilyData* cfd, + const std::string& cf_name, const Env* env); + + // Erase the ConstantColumnFamilyInfo that is associated with the + // specified ColumnFamilyData. This function should be called only + // when the current thread does not hold db_mutex. + static void EraseColumnFamilyInfo(const ColumnFamilyData* cfd); + + // Erase all ConstantColumnFamilyInfo that is associated with the + // specified db instance. This function should be called only when + // the current thread does not hold db_mutex. + static void EraseDatabaseInfo(const DB* db); + + // Update the thread status to indicate the current thread is doing + // something related to the specified column family. + static void SetColumnFamily(const ColumnFamilyData* cfd, const Env* env, + bool enable_thread_tracking); + + static void SetThreadOperation(ThreadStatus::OperationType type); + + static ThreadStatus::OperationStage SetThreadOperationStage( + ThreadStatus::OperationStage stage); + + static void SetThreadOperationProperty( + int code, uint64_t value); + + static void IncreaseThreadOperationProperty( + int code, uint64_t delta); + + static void SetThreadState(ThreadStatus::StateType type); + + static void ResetThreadStatus(); + +#ifndef NDEBUG + static void TEST_SetStateDelay( + const ThreadStatus::StateType state, int micro); + static void TEST_StateDelay(const ThreadStatus::StateType state); +#endif + + protected: + // Initialize the thread-local ThreadStatusUpdater when it finds + // the cached value is nullptr. Returns true if it has cached + // a non-null pointer. + static bool MaybeInitThreadLocalUpdater(const Env* env); + +#ifdef ROCKSDB_USING_THREAD_STATUS + // A boolean flag indicating whether thread_updater_local_cache_ + // is initialized. It is set to true when an Env uses any + // ThreadStatusUtil functions using the current thread other + // than UnregisterThread(). It will be set to false when + // UnregisterThread() is called. + // + // When this variable is set to true, thread_updater_local_cache_ + // will not be updated until this variable is again set to false + // in UnregisterThread(). + static __thread bool thread_updater_initialized_; + + // The thread-local cached ThreadStatusUpdater that caches the + // thread_status_updater_ of the first Env that uses any ThreadStatusUtil + // function other than UnregisterThread(). This variable will + // be cleared when UnregisterThread() is called. + // + // When this variable is set to a non-null pointer, then the status + // of the current thread will be updated when a function of + // ThreadStatusUtil is called. Otherwise, all functions of + // ThreadStatusUtil will be no-op. + // + // When thread_updater_initialized_ is set to true, this variable + // will not be updated until this thread_updater_initialized_ is + // again set to false in UnregisterThread(). + static __thread ThreadStatusUpdater* thread_updater_local_cache_; +#else + static bool thread_updater_initialized_; + static ThreadStatusUpdater* thread_updater_local_cache_; +#endif +}; + +// A helper class for updating thread state. It will set the +// thread state according to the input parameter in its constructor +// and set the thread state to the previous state in its destructor. +class AutoThreadOperationStageUpdater { + public: + explicit AutoThreadOperationStageUpdater( + ThreadStatus::OperationStage stage); + ~AutoThreadOperationStageUpdater(); + +#ifdef ROCKSDB_USING_THREAD_STATUS + private: + ThreadStatus::OperationStage prev_stage_; +#endif +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/monitoring/thread_status_util_debug.cc b/src/rocksdb/monitoring/thread_status_util_debug.cc new file mode 100644 index 000000000..375fe8c0a --- /dev/null +++ b/src/rocksdb/monitoring/thread_status_util_debug.cc @@ -0,0 +1,32 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <atomic> + +#include "monitoring/thread_status_updater.h" +#include "monitoring/thread_status_util.h" +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef NDEBUG +// the delay for debugging purpose. +static std::atomic<int> states_delay[ThreadStatus::NUM_STATE_TYPES]; + +void ThreadStatusUtil::TEST_SetStateDelay( + const ThreadStatus::StateType state, int micro) { + states_delay[state].store(micro, std::memory_order_relaxed); +} + +void ThreadStatusUtil::TEST_StateDelay(const ThreadStatus::StateType state) { + auto delay = states_delay[state].load(std::memory_order_relaxed); + if (delay > 0) { + Env::Default()->SleepForMicroseconds(delay); + } +} + +#endif // !NDEBUG + +} // namespace ROCKSDB_NAMESPACE |