summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/monitoring/histogram_windowing.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/monitoring/histogram_windowing.cc
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/monitoring/histogram_windowing.cc')
-rw-r--r--src/rocksdb/monitoring/histogram_windowing.cc202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/rocksdb/monitoring/histogram_windowing.cc b/src/rocksdb/monitoring/histogram_windowing.cc
new file mode 100644
index 00000000..ecd6f090
--- /dev/null
+++ b/src/rocksdb/monitoring/histogram_windowing.cc
@@ -0,0 +1,202 @@
+// Copyright (c) 2013, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "monitoring/histogram_windowing.h"
+#include "monitoring/histogram.h"
+#include "util/cast_util.h"
+
+#include <algorithm>
+
+namespace rocksdb {
+
+HistogramWindowingImpl::HistogramWindowingImpl() {
+ env_ = Env::Default();
+ window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
+ Clear();
+}
+
+HistogramWindowingImpl::HistogramWindowingImpl(
+ uint64_t num_windows,
+ uint64_t micros_per_window,
+ uint64_t min_num_per_window) :
+ num_windows_(num_windows),
+ micros_per_window_(micros_per_window),
+ min_num_per_window_(min_num_per_window) {
+ env_ = Env::Default();
+ window_stats_.reset(new HistogramStat[static_cast<size_t>(num_windows_)]);
+ Clear();
+}
+
+HistogramWindowingImpl::~HistogramWindowingImpl() {
+}
+
+void HistogramWindowingImpl::Clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+
+ stats_.Clear();
+ for (size_t i = 0; i < num_windows_; i++) {
+ window_stats_[i].Clear();
+ }
+ current_window_.store(0, std::memory_order_relaxed);
+ last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
+}
+
+bool HistogramWindowingImpl::Empty() const { return stats_.Empty(); }
+
+// This function is designed to be lock free, as it's in the critical path
+// of any operation.
+// Each individual value is atomic, it is just that some samples can go
+// in the older bucket which is tolerable.
+void HistogramWindowingImpl::Add(uint64_t value){
+ TimerTick();
+
+ // Parent (global) member update
+ stats_.Add(value);
+
+ // Current window update
+ window_stats_[static_cast<size_t>(current_window())].Add(value);
+}
+
+void HistogramWindowingImpl::Merge(const Histogram& other) {
+ if (strcmp(Name(), other.Name()) == 0) {
+ Merge(
+ *static_cast_with_check<const HistogramWindowingImpl, const Histogram>(
+ &other));
+ }
+}
+
+void HistogramWindowingImpl::Merge(const HistogramWindowingImpl& other) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ stats_.Merge(other.stats_);
+
+ if (stats_.num_buckets_ != other.stats_.num_buckets_ ||
+ micros_per_window_ != other.micros_per_window_) {
+ return;
+ }
+
+ uint64_t cur_window = current_window();
+ uint64_t other_cur_window = other.current_window();
+ // going backwards for alignment
+ for (unsigned int i = 0;
+ i < std::min(num_windows_, other.num_windows_); i++) {
+ uint64_t window_index =
+ (cur_window + num_windows_ - i) % num_windows_;
+ uint64_t other_window_index =
+ (other_cur_window + other.num_windows_ - i) % other.num_windows_;
+ size_t windex = static_cast<size_t>(window_index);
+ size_t other_windex = static_cast<size_t>(other_window_index);
+
+ window_stats_[windex].Merge(
+ other.window_stats_[other_windex]);
+ }
+}
+
+std::string HistogramWindowingImpl::ToString() const {
+ return stats_.ToString();
+}
+
+double HistogramWindowingImpl::Median() const {
+ return Percentile(50.0);
+}
+
+double HistogramWindowingImpl::Percentile(double p) const {
+ // Retry 3 times in total
+ for (int retry = 0; retry < 3; retry++) {
+ uint64_t start_num = stats_.num();
+ double result = stats_.Percentile(p);
+ // Detect if swap buckets or Clear() was called during calculation
+ if (stats_.num() >= start_num) {
+ return result;
+ }
+ }
+ return 0.0;
+}
+
+double HistogramWindowingImpl::Average() const {
+ return stats_.Average();
+}
+
+double HistogramWindowingImpl::StandardDeviation() const {
+ return stats_.StandardDeviation();
+}
+
+void HistogramWindowingImpl::Data(HistogramData * const data) const {
+ stats_.Data(data);
+}
+
+void HistogramWindowingImpl::TimerTick() {
+ uint64_t curr_time = env_->NowMicros();
+ size_t curr_window_ = static_cast<size_t>(current_window());
+ if (curr_time - last_swap_time() > micros_per_window_ &&
+ window_stats_[curr_window_].num() >= min_num_per_window_) {
+ SwapHistoryBucket();
+ }
+}
+
+void HistogramWindowingImpl::SwapHistoryBucket() {
+ // Threads executing Add() would be competing for this mutex, the first one
+ // who got the metex would take care of the bucket swap, other threads
+ // can skip this.
+ // If mutex is held by Merge() or Clear(), next Add() will take care of the
+ // swap, if needed.
+ if (mutex_.try_lock()) {
+ last_swap_time_.store(env_->NowMicros(), std::memory_order_relaxed);
+
+ uint64_t curr_window = current_window();
+ uint64_t next_window = (curr_window == num_windows_ - 1) ?
+ 0 : curr_window + 1;
+
+ // subtract next buckets from totals and swap to next buckets
+ HistogramStat& stats_to_drop =
+ window_stats_[static_cast<size_t>(next_window)];
+
+ if (!stats_to_drop.Empty()) {
+ for (size_t b = 0; b < stats_.num_buckets_; b++){
+ stats_.buckets_[b].fetch_sub(
+ stats_to_drop.bucket_at(b), std::memory_order_relaxed);
+ }
+
+ if (stats_.min() == stats_to_drop.min()) {
+ uint64_t new_min = std::numeric_limits<uint64_t>::max();
+ for (unsigned int i = 0; i < num_windows_; i++) {
+ if (i != next_window) {
+ uint64_t m = window_stats_[i].min();
+ if (m < new_min) new_min = m;
+ }
+ }
+ stats_.min_.store(new_min, std::memory_order_relaxed);
+ }
+
+ if (stats_.max() == stats_to_drop.max()) {
+ uint64_t new_max = 0;
+ for (unsigned int i = 0; i < num_windows_; i++) {
+ if (i != next_window) {
+ uint64_t m = window_stats_[i].max();
+ if (m > new_max) new_max = m;
+ }
+ }
+ stats_.max_.store(new_max, std::memory_order_relaxed);
+ }
+
+ stats_.num_.fetch_sub(stats_to_drop.num(), std::memory_order_relaxed);
+ stats_.sum_.fetch_sub(stats_to_drop.sum(), std::memory_order_relaxed);
+ stats_.sum_squares_.fetch_sub(
+ stats_to_drop.sum_squares(), std::memory_order_relaxed);
+
+ stats_to_drop.Clear();
+ }
+
+ // advance to next window bucket
+ current_window_.store(next_window, std::memory_order_relaxed);
+
+ mutex_.unlock();
+ }
+}
+
+} // namespace rocksdb