summaryrefslogtreecommitdiffstats
path: root/src/zstd/contrib/pzstd/utils/WorkQueue.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/zstd/contrib/pzstd/utils/WorkQueue.h
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/zstd/contrib/pzstd/utils/WorkQueue.h')
-rw-r--r--src/zstd/contrib/pzstd/utils/WorkQueue.h181
1 files changed, 181 insertions, 0 deletions
diff --git a/src/zstd/contrib/pzstd/utils/WorkQueue.h b/src/zstd/contrib/pzstd/utils/WorkQueue.h
new file mode 100644
index 000000000..1d14d922c
--- /dev/null
+++ b/src/zstd/contrib/pzstd/utils/WorkQueue.h
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under both the BSD-style license (found in the
+ * LICENSE file in the root directory of this source tree) and the GPLv2 (found
+ * in the COPYING file in the root directory of this source tree).
+ */
+#pragma once
+
+#include "utils/Buffer.h"
+
+#include <atomic>
+#include <cassert>
+#include <cstddef>
+#include <condition_variable>
+#include <cstddef>
+#include <functional>
+#include <mutex>
+#include <queue>
+
+namespace pzstd {
+
+/// Unbounded thread-safe work queue.
+template <typename T>
+class WorkQueue {
+ // Protects all member variable access
+ std::mutex mutex_;
+ std::condition_variable readerCv_;
+ std::condition_variable writerCv_;
+ std::condition_variable finishCv_;
+
+ std::queue<T> queue_;
+ bool done_;
+ std::size_t maxSize_;
+
+ // Must have lock to call this function
+ bool full() const {
+ if (maxSize_ == 0) {
+ return false;
+ }
+ return queue_.size() >= maxSize_;
+ }
+
+ public:
+ /**
+ * Constructs an empty work queue with an optional max size.
+ * If `maxSize == 0` the queue size is unbounded.
+ *
+ * @param maxSize The maximum allowed size of the work queue.
+ */
+ WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
+
+ /**
+ * Push an item onto the work queue. Notify a single thread that work is
+ * available. If `finish()` has been called, do nothing and return false.
+ * If `push()` returns false, then `item` has not been moved from.
+ *
+ * @param item Item to push onto the queue.
+ * @returns True upon success, false if `finish()` has been called. An
+ * item was pushed iff `push()` returns true.
+ */
+ bool push(T&& item) {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (full() && !done_) {
+ writerCv_.wait(lock);
+ }
+ if (done_) {
+ return false;
+ }
+ queue_.push(std::move(item));
+ }
+ readerCv_.notify_one();
+ return true;
+ }
+
+ /**
+ * Attempts to pop an item off the work queue. It will block until data is
+ * available or `finish()` has been called.
+ *
+ * @param[out] item If `pop` returns `true`, it contains the popped item.
+ * If `pop` returns `false`, it is unmodified.
+ * @returns True upon success. False if the queue is empty and
+ * `finish()` has been called.
+ */
+ bool pop(T& item) {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (queue_.empty() && !done_) {
+ readerCv_.wait(lock);
+ }
+ if (queue_.empty()) {
+ assert(done_);
+ return false;
+ }
+ item = std::move(queue_.front());
+ queue_.pop();
+ }
+ writerCv_.notify_one();
+ return true;
+ }
+
+ /**
+ * Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
+ *
+ * @param maxSize The new maximum queue size.
+ */
+ void setMaxSize(std::size_t maxSize) {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ maxSize_ = maxSize;
+ }
+ writerCv_.notify_all();
+ }
+
+ /**
+ * Promise that `push()` won't be called again, so once the queue is empty
+ * there will never any more work.
+ */
+ void finish() {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ assert(!done_);
+ done_ = true;
+ }
+ readerCv_.notify_all();
+ writerCv_.notify_all();
+ finishCv_.notify_all();
+ }
+
+ /// Blocks until `finish()` has been called (but the queue may not be empty).
+ void waitUntilFinished() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (!done_) {
+ finishCv_.wait(lock);
+ }
+ }
+};
+
+/// Work queue for `Buffer`s that knows the total number of bytes in the queue.
+class BufferWorkQueue {
+ WorkQueue<Buffer> queue_;
+ std::atomic<std::size_t> size_;
+
+ public:
+ BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
+
+ void push(Buffer buffer) {
+ size_.fetch_add(buffer.size());
+ queue_.push(std::move(buffer));
+ }
+
+ bool pop(Buffer& buffer) {
+ bool result = queue_.pop(buffer);
+ if (result) {
+ size_.fetch_sub(buffer.size());
+ }
+ return result;
+ }
+
+ void setMaxSize(std::size_t maxSize) {
+ queue_.setMaxSize(maxSize);
+ }
+
+ void finish() {
+ queue_.finish();
+ }
+
+ /**
+ * Blocks until `finish()` has been called.
+ *
+ * @returns The total number of bytes of all the `Buffer`s currently in the
+ * queue.
+ */
+ std::size_t size() {
+ queue_.waitUntilFinished();
+ return size_.load();
+ }
+};
+}