summaryrefslogtreecommitdiffstats
path: root/src/common/QueueRing.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/common/QueueRing.h
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/common/QueueRing.h64
1 files changed, 64 insertions, 0 deletions
diff --git a/src/common/QueueRing.h b/src/common/QueueRing.h
new file mode 100644
index 00000000..af5c47be
--- /dev/null
+++ b/src/common/QueueRing.h
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef QUEUE_RING_H
+#define QUEUE_RING_H
+
+#include "common/ceph_mutex.h"
+
+#include <list>
+#include <atomic>
+#include <vector>
+
+template <class T>
+class QueueRing {
+ struct QueueBucket {
+ ceph::mutex lock = ceph::make_mutex("QueueRing::QueueBucket::lock");
+ ceph::condition_variable cond;
+ typename std::list<T> entries;
+
+ QueueBucket() {}
+ QueueBucket(const QueueBucket& rhs) {
+ entries = rhs.entries;
+ }
+
+ void enqueue(const T& entry) {
+ lock.lock();
+ if (entries.empty()) {
+ cond.notify_all();
+ }
+ entries.push_back(entry);
+ lock.unlock();
+ }
+
+ void dequeue(T *entry) {
+ std::unique_lock l(lock);
+ while (entries.empty()) {
+ cond.wait(l);
+ };
+ ceph_assert(!entries.empty());
+ *entry = entries.front();
+ entries.pop_front();
+ };
+ };
+
+ std::vector<QueueBucket> buckets;
+ int num_buckets;
+
+ std::atomic<int64_t> cur_read_bucket = { 0 };
+ std::atomic<int64_t> cur_write_bucket = { 0 };
+
+public:
+ QueueRing(int n) : buckets(n), num_buckets(n) {
+ }
+
+ void enqueue(const T& entry) {
+ buckets[++cur_write_bucket % num_buckets].enqueue(entry);
+ };
+
+ void dequeue(T *entry) {
+ buckets[++cur_read_bucket % num_buckets].dequeue(entry);
+ }
+};
+
+#endif