summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/flush_scheduler.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rocksdb/db/flush_scheduler.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/flush_scheduler.cc')
-rw-r--r--src/rocksdb/db/flush_scheduler.cc88
1 files changed, 88 insertions, 0 deletions
diff --git a/src/rocksdb/db/flush_scheduler.cc b/src/rocksdb/db/flush_scheduler.cc
new file mode 100644
index 00000000..8735a6b3
--- /dev/null
+++ b/src/rocksdb/db/flush_scheduler.cc
@@ -0,0 +1,88 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/flush_scheduler.h"
+
+#include <cassert>
+
+#include "db/column_family.h"
+
+namespace rocksdb {
+
+void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
+#ifndef NDEBUG
+ std::lock_guard<std::mutex> lock(checking_mutex_);
+ assert(checking_set_.count(cfd) == 0);
+ checking_set_.insert(cfd);
+#endif // NDEBUG
+ cfd->Ref();
+// Suppress false positive clang analyzer warnings.
+#ifndef __clang_analyzer__
+ Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
+ while (!head_.compare_exchange_strong(
+ node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
+ // failing CAS updates the first param, so we are already set for
+ // retry. TakeNextColumnFamily won't happen until after another
+ // inter-thread synchronization, so we don't even need release
+ // semantics for this CAS
+ }
+#endif // __clang_analyzer__
+}
+
+ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
+#ifndef NDEBUG
+ std::lock_guard<std::mutex> lock(checking_mutex_);
+#endif // NDEBUG
+ while (true) {
+ if (head_.load(std::memory_order_relaxed) == nullptr) {
+ return nullptr;
+ }
+
+ // dequeue the head
+ Node* node = head_.load(std::memory_order_relaxed);
+ head_.store(node->next, std::memory_order_relaxed);
+ ColumnFamilyData* cfd = node->column_family;
+ delete node;
+
+#ifndef NDEBUG
+ auto iter = checking_set_.find(cfd);
+ assert(iter != checking_set_.end());
+ checking_set_.erase(iter);
+#endif // NDEBUG
+
+ if (!cfd->IsDropped()) {
+ // success
+ return cfd;
+ }
+
+ // no longer relevant, retry
+ if (cfd->Unref()) {
+ delete cfd;
+ }
+ }
+}
+
+bool FlushScheduler::Empty() {
+#ifndef NDEBUG
+ std::lock_guard<std::mutex> lock(checking_mutex_);
+#endif // NDEBUG
+ auto rv = head_.load(std::memory_order_relaxed) == nullptr;
+#ifndef NDEBUG
+ assert(rv == checking_set_.empty());
+#endif // NDEBUG
+ return rv;
+}
+
+void FlushScheduler::Clear() {
+ ColumnFamilyData* cfd;
+ while ((cfd = TakeNextColumnFamily()) != nullptr) {
+ if (cfd->Unref()) {
+ delete cfd;
+ }
+ }
+ assert(head_.load(std::memory_order_relaxed) == nullptr);
+}
+
+} // namespace rocksdb