diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/zstd/contrib/pzstd/utils | |
parent | Initial commit. (diff) | |
download | ceph-upstream/16.2.11+ds.tar.xz ceph-upstream/16.2.11+ds.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/zstd/contrib/pzstd/utils')
-rw-r--r-- | src/zstd/contrib/pzstd/utils/BUCK | 75 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/Buffer.h | 99 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/FileSystem.h | 94 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/Likely.h | 28 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/Range.h | 131 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/ResourcePool.h | 96 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/ScopeGuard.h | 50 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/ThreadPool.h | 58 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/WorkQueue.h | 181 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/test/BUCK | 35 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/test/BufferTest.cpp | 89 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/test/RangeTest.cpp | 82 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp | 72 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp | 28 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp | 71 | ||||
-rw-r--r-- | src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp | 282 |
16 files changed, 1471 insertions, 0 deletions
diff --git a/src/zstd/contrib/pzstd/utils/BUCK b/src/zstd/contrib/pzstd/utils/BUCK new file mode 100644 index 000000000..e757f4120 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/BUCK @@ -0,0 +1,75 @@ +cxx_library( + name='buffer', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['Buffer.h'], + deps=[':range'], +) + +cxx_library( + name='file_system', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['FileSystem.h'], + deps=[':range'], +) + +cxx_library( + name='likely', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['Likely.h'], +) + +cxx_library( + name='range', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['Range.h'], + deps=[':likely'], +) + +cxx_library( + name='resource_pool', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['ResourcePool.h'], +) + +cxx_library( + name='scope_guard', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['ScopeGuard.h'], +) + +cxx_library( + name='thread_pool', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['ThreadPool.h'], + deps=[':work_queue'], +) + +cxx_library( + name='work_queue', + visibility=['PUBLIC'], + header_namespace='utils', + exported_headers=['WorkQueue.h'], + deps=[':buffer'], +) + +cxx_library( + name='utils', + visibility=['PUBLIC'], + deps=[ + ':buffer', + ':file_system', + ':likely', + ':range', + ':resource_pool', + ':scope_guard', + ':thread_pool', + ':work_queue', + ], +) diff --git a/src/zstd/contrib/pzstd/utils/Buffer.h b/src/zstd/contrib/pzstd/utils/Buffer.h new file mode 100644 index 000000000..f69c3b4d9 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/Buffer.h @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/Range.h" + +#include <array> +#include <cstddef> +#include <memory> + +namespace pzstd { + +/** + * A `Buffer` has a pointer to a shared buffer, and a range of the buffer that + * it owns. + * The idea is that you can allocate one buffer, and write chunks into it + * and break off those chunks. + * The underlying buffer is reference counted, and will be destroyed when all + * `Buffer`s that reference it are destroyed. + */ +class Buffer { + std::shared_ptr<unsigned char> buffer_; + MutableByteRange range_; + + static void delete_buffer(unsigned char* buffer) { + delete[] buffer; + } + + public: + /// Construct an empty buffer that owns no data. + explicit Buffer() {} + + /// Construct a `Buffer` that owns a new underlying buffer of size `size`. + explicit Buffer(std::size_t size) + : buffer_(new unsigned char[size], delete_buffer), + range_(buffer_.get(), buffer_.get() + size) {} + + explicit Buffer(std::shared_ptr<unsigned char> buffer, MutableByteRange data) + : buffer_(buffer), range_(data) {} + + Buffer(Buffer&&) = default; + Buffer& operator=(Buffer&&) & = default; + + /** + * Splits the data into two pieces: [begin, begin + n), [begin + n, end). + * Their data both points into the same underlying buffer. + * Modifies the original `Buffer` to point to only [begin + n, end). + * + * @param n The offset to split at. + * @returns A buffer that owns the data [begin, begin + n). + */ + Buffer splitAt(std::size_t n) { + auto firstPiece = range_.subpiece(0, n); + range_.advance(n); + return Buffer(buffer_, firstPiece); + } + + /// Modifies the buffer to point to the range [begin + n, end). + void advance(std::size_t n) { + range_.advance(n); + } + + /// Modifies the buffer to point to the range [begin, end - n). + void subtract(std::size_t n) { + range_.subtract(n); + } + + /// Returns a read only `Range` pointing to the `Buffer`s data. + ByteRange range() const { + return range_; + } + /// Returns a mutable `Range` pointing to the `Buffer`s data. + MutableByteRange range() { + return range_; + } + + const unsigned char* data() const { + return range_.data(); + } + + unsigned char* data() { + return range_.data(); + } + + std::size_t size() const { + return range_.size(); + } + + bool empty() const { + return range_.empty(); + } +}; +} diff --git a/src/zstd/contrib/pzstd/utils/FileSystem.h b/src/zstd/contrib/pzstd/utils/FileSystem.h new file mode 100644 index 000000000..3cfbe86e5 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/FileSystem.h @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/Range.h" + +#include <sys/stat.h> +#include <cerrno> +#include <cstdint> +#include <system_error> + +// A small subset of `std::filesystem`. +// `std::filesystem` should be a drop in replacement. +// See http://en.cppreference.com/w/cpp/filesystem for documentation. + +namespace pzstd { + +// using file_status = ... causes gcc to emit a false positive warning +#if defined(_MSC_VER) +typedef struct ::_stat64 file_status; +#else +typedef struct ::stat file_status; +#endif + +/// http://en.cppreference.com/w/cpp/filesystem/status +inline file_status status(StringPiece path, std::error_code& ec) noexcept { + file_status status; +#if defined(_MSC_VER) + const auto error = ::_stat64(path.data(), &status); +#else + const auto error = ::stat(path.data(), &status); +#endif + if (error) { + ec.assign(errno, std::generic_category()); + } else { + ec.clear(); + } + return status; +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file +inline bool is_regular_file(file_status status) noexcept { +#if defined(S_ISREG) + return S_ISREG(status.st_mode); +#elif !defined(S_ISREG) && defined(S_IFMT) && defined(S_IFREG) + return (status.st_mode & S_IFMT) == S_IFREG; +#else + static_assert(false, "No POSIX stat() support."); +#endif +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_regular_file +inline bool is_regular_file(StringPiece path, std::error_code& ec) noexcept { + return is_regular_file(status(path, ec)); +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_directory +inline bool is_directory(file_status status) noexcept { +#if defined(S_ISDIR) + return S_ISDIR(status.st_mode); +#elif !defined(S_ISDIR) && defined(S_IFMT) && defined(S_IFDIR) + return (status.st_mode & S_IFMT) == S_IFDIR; +#else + static_assert(false, "NO POSIX stat() support."); +#endif +} + +/// http://en.cppreference.com/w/cpp/filesystem/is_directory +inline bool is_directory(StringPiece path, std::error_code& ec) noexcept { + return is_directory(status(path, ec)); +} + +/// http://en.cppreference.com/w/cpp/filesystem/file_size +inline std::uintmax_t file_size( + StringPiece path, + std::error_code& ec) noexcept { + auto stat = status(path, ec); + if (ec) { + return -1; + } + if (!is_regular_file(stat)) { + ec.assign(ENOTSUP, std::generic_category()); + return -1; + } + ec.clear(); + return stat.st_size; +} +} diff --git a/src/zstd/contrib/pzstd/utils/Likely.h b/src/zstd/contrib/pzstd/utils/Likely.h new file mode 100644 index 000000000..7cea8da27 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/Likely.h @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ + +/** + * Compiler hints to indicate the fast path of an "if" branch: whether + * the if condition is likely to be true or false. + * + * @author Tudor Bosman (tudorb@fb.com) + */ + +#pragma once + +#undef LIKELY +#undef UNLIKELY + +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif diff --git a/src/zstd/contrib/pzstd/utils/Range.h b/src/zstd/contrib/pzstd/utils/Range.h new file mode 100644 index 000000000..fedb5d786 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/Range.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ + +/** + * A subset of `folly/Range.h`. + * All code copied verbatim modulo formatting + */ +#pragma once + +#include "utils/Likely.h" + +#include <cstddef> +#include <cstring> +#include <stdexcept> +#include <string> +#include <type_traits> + +namespace pzstd { + +namespace detail { +/* + *Use IsCharPointer<T>::type to enable const char* or char*. + *Use IsCharPointer<T>::const_type to enable only const char*. +*/ +template <class T> +struct IsCharPointer {}; + +template <> +struct IsCharPointer<char*> { + typedef int type; +}; + +template <> +struct IsCharPointer<const char*> { + typedef int const_type; + typedef int type; +}; + +} // namespace detail + +template <typename Iter> +class Range { + Iter b_; + Iter e_; + + public: + using size_type = std::size_t; + using iterator = Iter; + using const_iterator = Iter; + using value_type = typename std::remove_reference< + typename std::iterator_traits<Iter>::reference>::type; + using reference = typename std::iterator_traits<Iter>::reference; + + constexpr Range() : b_(), e_() {} + constexpr Range(Iter begin, Iter end) : b_(begin), e_(end) {} + + constexpr Range(Iter begin, size_type size) : b_(begin), e_(begin + size) {} + + template <class T = Iter, typename detail::IsCharPointer<T>::type = 0> + /* implicit */ Range(Iter str) : b_(str), e_(str + std::strlen(str)) {} + + template <class T = Iter, typename detail::IsCharPointer<T>::const_type = 0> + /* implicit */ Range(const std::string& str) + : b_(str.data()), e_(b_ + str.size()) {} + + // Allow implicit conversion from Range<From> to Range<To> if From is + // implicitly convertible to To. + template < + class OtherIter, + typename std::enable_if< + (!std::is_same<Iter, OtherIter>::value && + std::is_convertible<OtherIter, Iter>::value), + int>::type = 0> + constexpr /* implicit */ Range(const Range<OtherIter>& other) + : b_(other.begin()), e_(other.end()) {} + + Range(const Range&) = default; + Range(Range&&) = default; + + Range& operator=(const Range&) & = default; + Range& operator=(Range&&) & = default; + + constexpr size_type size() const { + return e_ - b_; + } + bool empty() const { + return b_ == e_; + } + Iter data() const { + return b_; + } + Iter begin() const { + return b_; + } + Iter end() const { + return e_; + } + + void advance(size_type n) { + if (UNLIKELY(n > size())) { + throw std::out_of_range("index out of range"); + } + b_ += n; + } + + void subtract(size_type n) { + if (UNLIKELY(n > size())) { + throw std::out_of_range("index out of range"); + } + e_ -= n; + } + + Range subpiece(size_type first, size_type length = std::string::npos) const { + if (UNLIKELY(first > size())) { + throw std::out_of_range("index out of range"); + } + + return Range(b_ + first, std::min(length, size() - first)); + } +}; + +using ByteRange = Range<const unsigned char*>; +using MutableByteRange = Range<unsigned char*>; +using StringPiece = Range<const char*>; +} diff --git a/src/zstd/contrib/pzstd/utils/ResourcePool.h b/src/zstd/contrib/pzstd/utils/ResourcePool.h new file mode 100644 index 000000000..8dfcdd765 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/ResourcePool.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include <cassert> +#include <functional> +#include <memory> +#include <mutex> +#include <vector> + +namespace pzstd { + +/** + * An unbounded pool of resources. + * A `ResourcePool<T>` requires a factory function that takes allocates `T*` and + * a free function that frees a `T*`. + * Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr` + * to a `T`, and when it goes out of scope the resource will be returned to the + * pool. + * The `ResourcePool<T>` *must* survive longer than any resources it hands out. + * Remember that `ResourcePool<T>` hands out mutable `T`s, so make sure to clean + * up the resource before or after every use. + */ +template <typename T> +class ResourcePool { + public: + class Deleter; + using Factory = std::function<T*()>; + using Free = std::function<void(T*)>; + using UniquePtr = std::unique_ptr<T, Deleter>; + + private: + std::mutex mutex_; + Factory factory_; + Free free_; + std::vector<T*> resources_; + unsigned inUse_; + + public: + /** + * Creates a `ResourcePool`. + * + * @param factory The function to use to create new resources. + * @param free The function to use to free resources created by `factory`. + */ + ResourcePool(Factory factory, Free free) + : factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {} + + /** + * @returns A unique pointer to a resource. The resource is null iff + * there are no available resources and `factory()` returns null. + */ + UniquePtr get() { + std::lock_guard<std::mutex> lock(mutex_); + if (!resources_.empty()) { + UniquePtr resource{resources_.back(), Deleter{*this}}; + resources_.pop_back(); + ++inUse_; + return resource; + } + UniquePtr resource{factory_(), Deleter{*this}}; + ++inUse_; + return resource; + } + + ~ResourcePool() noexcept { + assert(inUse_ == 0); + for (const auto resource : resources_) { + free_(resource); + } + } + + class Deleter { + ResourcePool *pool_; + public: + explicit Deleter(ResourcePool &pool) : pool_(&pool) {} + + void operator() (T *resource) { + std::lock_guard<std::mutex> lock(pool_->mutex_); + // Make sure we don't put null resources into the pool + if (resource) { + pool_->resources_.push_back(resource); + } + assert(pool_->inUse_ > 0); + --pool_->inUse_; + } + }; +}; + +} diff --git a/src/zstd/contrib/pzstd/utils/ScopeGuard.h b/src/zstd/contrib/pzstd/utils/ScopeGuard.h new file mode 100644 index 000000000..31768f43d --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/ScopeGuard.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include <utility> + +namespace pzstd { + +/** + * Dismissable scope guard. + * `Function` must be callable and take no parameters. + * Unless `dissmiss()` is called, the callable is executed upon destruction of + * `ScopeGuard`. + * + * Example: + * + * auto guard = makeScopeGuard([&] { cleanup(); }); + */ +template <typename Function> +class ScopeGuard { + Function function; + bool dismissed; + + public: + explicit ScopeGuard(Function&& function) + : function(std::move(function)), dismissed(false) {} + + void dismiss() { + dismissed = true; + } + + ~ScopeGuard() noexcept { + if (!dismissed) { + function(); + } + } +}; + +/// Creates a scope guard from `function`. +template <typename Function> +ScopeGuard<Function> makeScopeGuard(Function&& function) { + return ScopeGuard<Function>(std::forward<Function>(function)); +} +} diff --git a/src/zstd/contrib/pzstd/utils/ThreadPool.h b/src/zstd/contrib/pzstd/utils/ThreadPool.h new file mode 100644 index 000000000..8ece8e0da --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/ThreadPool.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/WorkQueue.h" + +#include <cstddef> +#include <functional> +#include <thread> +#include <vector> + +namespace pzstd { +/// A simple thread pool that pulls tasks off its queue in FIFO order. +class ThreadPool { + std::vector<std::thread> threads_; + + WorkQueue<std::function<void()>> tasks_; + + public: + /// Constructs a thread pool with `numThreads` threads. + explicit ThreadPool(std::size_t numThreads) { + threads_.reserve(numThreads); + for (std::size_t i = 0; i < numThreads; ++i) { + threads_.emplace_back([this] { + std::function<void()> task; + while (tasks_.pop(task)) { + task(); + } + }); + } + } + + /// Finishes all tasks currently in the queue. + ~ThreadPool() { + tasks_.finish(); + for (auto& thread : threads_) { + thread.join(); + } + } + + /** + * Adds `task` to the queue of tasks to execute. Since `task` is a + * `std::function<>`, it cannot be a move only type. So any lambda passed must + * not capture move only types (like `std::unique_ptr`). + * + * @param task The task to execute. + */ + void add(std::function<void()> task) { + tasks_.push(std::move(task)); + } +}; +} diff --git a/src/zstd/contrib/pzstd/utils/WorkQueue.h b/src/zstd/contrib/pzstd/utils/WorkQueue.h new file mode 100644 index 000000000..1d14d922c --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/WorkQueue.h @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include "utils/Buffer.h" + +#include <atomic> +#include <cassert> +#include <cstddef> +#include <condition_variable> +#include <cstddef> +#include <functional> +#include <mutex> +#include <queue> + +namespace pzstd { + +/// Unbounded thread-safe work queue. +template <typename T> +class WorkQueue { + // Protects all member variable access + std::mutex mutex_; + std::condition_variable readerCv_; + std::condition_variable writerCv_; + std::condition_variable finishCv_; + + std::queue<T> queue_; + bool done_; + std::size_t maxSize_; + + // Must have lock to call this function + bool full() const { + if (maxSize_ == 0) { + return false; + } + return queue_.size() >= maxSize_; + } + + public: + /** + * Constructs an empty work queue with an optional max size. + * If `maxSize == 0` the queue size is unbounded. + * + * @param maxSize The maximum allowed size of the work queue. + */ + WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {} + + /** + * Push an item onto the work queue. Notify a single thread that work is + * available. If `finish()` has been called, do nothing and return false. + * If `push()` returns false, then `item` has not been moved from. + * + * @param item Item to push onto the queue. + * @returns True upon success, false if `finish()` has been called. An + * item was pushed iff `push()` returns true. + */ + bool push(T&& item) { + { + std::unique_lock<std::mutex> lock(mutex_); + while (full() && !done_) { + writerCv_.wait(lock); + } + if (done_) { + return false; + } + queue_.push(std::move(item)); + } + readerCv_.notify_one(); + return true; + } + + /** + * Attempts to pop an item off the work queue. It will block until data is + * available or `finish()` has been called. + * + * @param[out] item If `pop` returns `true`, it contains the popped item. + * If `pop` returns `false`, it is unmodified. + * @returns True upon success. False if the queue is empty and + * `finish()` has been called. + */ + bool pop(T& item) { + { + std::unique_lock<std::mutex> lock(mutex_); + while (queue_.empty() && !done_) { + readerCv_.wait(lock); + } + if (queue_.empty()) { + assert(done_); + return false; + } + item = std::move(queue_.front()); + queue_.pop(); + } + writerCv_.notify_one(); + return true; + } + + /** + * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. + * + * @param maxSize The new maximum queue size. + */ + void setMaxSize(std::size_t maxSize) { + { + std::lock_guard<std::mutex> lock(mutex_); + maxSize_ = maxSize; + } + writerCv_.notify_all(); + } + + /** + * Promise that `push()` won't be called again, so once the queue is empty + * there will never any more work. + */ + void finish() { + { + std::lock_guard<std::mutex> lock(mutex_); + assert(!done_); + done_ = true; + } + readerCv_.notify_all(); + writerCv_.notify_all(); + finishCv_.notify_all(); + } + + /// Blocks until `finish()` has been called (but the queue may not be empty). + void waitUntilFinished() { + std::unique_lock<std::mutex> lock(mutex_); + while (!done_) { + finishCv_.wait(lock); + } + } +}; + +/// Work queue for `Buffer`s that knows the total number of bytes in the queue. +class BufferWorkQueue { + WorkQueue<Buffer> queue_; + std::atomic<std::size_t> size_; + + public: + BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {} + + void push(Buffer buffer) { + size_.fetch_add(buffer.size()); + queue_.push(std::move(buffer)); + } + + bool pop(Buffer& buffer) { + bool result = queue_.pop(buffer); + if (result) { + size_.fetch_sub(buffer.size()); + } + return result; + } + + void setMaxSize(std::size_t maxSize) { + queue_.setMaxSize(maxSize); + } + + void finish() { + queue_.finish(); + } + + /** + * Blocks until `finish()` has been called. + * + * @returns The total number of bytes of all the `Buffer`s currently in the + * queue. + */ + std::size_t size() { + queue_.waitUntilFinished(); + return size_.load(); + } +}; +} diff --git a/src/zstd/contrib/pzstd/utils/test/BUCK b/src/zstd/contrib/pzstd/utils/test/BUCK new file mode 100644 index 000000000..a5113cab6 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/BUCK @@ -0,0 +1,35 @@ +cxx_test( + name='buffer_test', + srcs=['BufferTest.cpp'], + deps=['//contrib/pzstd/utils:buffer'], +) + +cxx_test( + name='range_test', + srcs=['RangeTest.cpp'], + deps=['//contrib/pzstd/utils:range'], +) + +cxx_test( + name='resource_pool_test', + srcs=['ResourcePoolTest.cpp'], + deps=['//contrib/pzstd/utils:resource_pool'], +) + +cxx_test( + name='scope_guard_test', + srcs=['ScopeGuardTest.cpp'], + deps=['//contrib/pzstd/utils:scope_guard'], +) + +cxx_test( + name='thread_pool_test', + srcs=['ThreadPoolTest.cpp'], + deps=['//contrib/pzstd/utils:thread_pool'], +) + +cxx_test( + name='work_queue_test', + srcs=['RangeTest.cpp'], + deps=['//contrib/pzstd/utils:work_queue'], +) diff --git a/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp b/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp new file mode 100644 index 000000000..fbba74e82 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/BufferTest.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/Buffer.h" +#include "utils/Range.h" + +#include <gtest/gtest.h> +#include <memory> + +using namespace pzstd; + +namespace { +void deleter(const unsigned char* buf) { + delete[] buf; +} +} + +TEST(Buffer, Constructors) { + Buffer empty; + EXPECT_TRUE(empty.empty()); + EXPECT_EQ(0, empty.size()); + + Buffer sized(5); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); + + Buffer moved(std::move(sized)); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); + + Buffer assigned; + assigned = std::move(moved); + EXPECT_FALSE(sized.empty()); + EXPECT_EQ(5, sized.size()); +} + +TEST(Buffer, BufferManagement) { + std::shared_ptr<unsigned char> buf(new unsigned char[10], deleter); + { + Buffer acquired(buf, MutableByteRange(buf.get(), buf.get() + 10)); + EXPECT_EQ(2, buf.use_count()); + Buffer moved(std::move(acquired)); + EXPECT_EQ(2, buf.use_count()); + Buffer assigned; + assigned = std::move(moved); + EXPECT_EQ(2, buf.use_count()); + + Buffer split = assigned.splitAt(5); + EXPECT_EQ(3, buf.use_count()); + + split.advance(1); + assigned.subtract(1); + EXPECT_EQ(3, buf.use_count()); + } + EXPECT_EQ(1, buf.use_count()); +} + +TEST(Buffer, Modifiers) { + Buffer buf(10); + { + unsigned char i = 0; + for (auto& byte : buf.range()) { + byte = i++; + } + } + + auto prefix = buf.splitAt(2); + + ASSERT_EQ(2, prefix.size()); + EXPECT_EQ(0, *prefix.data()); + + ASSERT_EQ(8, buf.size()); + EXPECT_EQ(2, *buf.data()); + + buf.advance(2); + EXPECT_EQ(4, *buf.data()); + + EXPECT_EQ(9, *(buf.range().end() - 1)); + + buf.subtract(2); + EXPECT_EQ(7, *(buf.range().end() - 1)); + + EXPECT_EQ(4, buf.size()); +} diff --git a/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp b/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp new file mode 100644 index 000000000..755b50fa6 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/RangeTest.cpp @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/Range.h" + +#include <gtest/gtest.h> +#include <string> + +using namespace pzstd; + +// Range is directly copied from folly. +// Just some sanity tests to make sure everything seems to work. + +TEST(Range, Constructors) { + StringPiece empty; + EXPECT_TRUE(empty.empty()); + EXPECT_EQ(0, empty.size()); + + std::string str = "hello"; + { + Range<std::string::const_iterator> piece(str.begin(), str.end()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str.data(), str.size()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } + + { + StringPiece piece(str.c_str()); + EXPECT_EQ(5, piece.size()); + EXPECT_EQ('h', *piece.data()); + EXPECT_EQ('o', *(piece.end() - 1)); + } +} + +TEST(Range, Modifiers) { + StringPiece range("hello world"); + ASSERT_EQ(11, range.size()); + + { + auto hello = range.subpiece(0, 5); + EXPECT_EQ(5, hello.size()); + EXPECT_EQ('h', *hello.data()); + EXPECT_EQ('o', *(hello.end() - 1)); + } + { + auto hello = range; + hello.subtract(6); + EXPECT_EQ(5, hello.size()); + EXPECT_EQ('h', *hello.data()); + EXPECT_EQ('o', *(hello.end() - 1)); + } + { + auto world = range; + world.advance(6); + EXPECT_EQ(5, world.size()); + EXPECT_EQ('w', *world.data()); + EXPECT_EQ('d', *(world.end() - 1)); + } + + std::string expected = "hello world"; + EXPECT_EQ(expected, std::string(range.begin(), range.end())); + EXPECT_EQ(expected, std::string(range.data(), range.size())); +} diff --git a/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp b/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp new file mode 100644 index 000000000..6fe145180 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/ResourcePoolTest.cpp @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/ResourcePool.h" + +#include <gtest/gtest.h> +#include <atomic> +#include <thread> + +using namespace pzstd; + +TEST(ResourcePool, FullTest) { + unsigned numCreated = 0; + unsigned numDeleted = 0; + { + ResourcePool<int> pool( + [&numCreated] { ++numCreated; return new int{5}; }, + [&numDeleted](int *x) { ++numDeleted; delete x; }); + + { + auto i = pool.get(); + EXPECT_EQ(5, *i); + *i = 6; + } + { + auto i = pool.get(); + EXPECT_EQ(6, *i); + auto j = pool.get(); + EXPECT_EQ(5, *j); + *j = 7; + } + { + auto i = pool.get(); + EXPECT_EQ(6, *i); + auto j = pool.get(); + EXPECT_EQ(7, *j); + } + } + EXPECT_EQ(2, numCreated); + EXPECT_EQ(numCreated, numDeleted); +} + +TEST(ResourcePool, ThreadSafe) { + std::atomic<unsigned> numCreated{0}; + std::atomic<unsigned> numDeleted{0}; + { + ResourcePool<int> pool( + [&numCreated] { ++numCreated; return new int{0}; }, + [&numDeleted](int *x) { ++numDeleted; delete x; }); + auto push = [&pool] { + for (int i = 0; i < 100; ++i) { + auto x = pool.get(); + ++*x; + } + }; + std::thread t1{push}; + std::thread t2{push}; + t1.join(); + t2.join(); + + auto x = pool.get(); + auto y = pool.get(); + EXPECT_EQ(200, *x + *y); + } + EXPECT_GE(2, numCreated); + EXPECT_EQ(numCreated, numDeleted); +} diff --git a/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp b/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp new file mode 100644 index 000000000..7bc624da7 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/ScopeGuardTest.cpp @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/ScopeGuard.h" + +#include <gtest/gtest.h> + +using namespace pzstd; + +TEST(ScopeGuard, Dismiss) { + { + auto guard = makeScopeGuard([&] { EXPECT_TRUE(false); }); + guard.dismiss(); + } +} + +TEST(ScopeGuard, Executes) { + bool executed = false; + { + auto guard = makeScopeGuard([&] { executed = true; }); + } + EXPECT_TRUE(executed); +} diff --git a/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp b/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp new file mode 100644 index 000000000..703fd4c9c --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/ThreadPoolTest.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/ThreadPool.h" + +#include <gtest/gtest.h> +#include <atomic> +#include <iostream> +#include <thread> +#include <vector> + +using namespace pzstd; + +TEST(ThreadPool, Ordering) { + std::vector<int> results; + + { + ThreadPool executor(1); + for (int i = 0; i < 10; ++i) { + executor.add([ &results, i ] { results.push_back(i); }); + } + } + + for (int i = 0; i < 10; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(ThreadPool, AllJobsFinished) { + std::atomic<unsigned> numFinished{0}; + std::atomic<bool> start{false}; + { + std::cerr << "Creating executor" << std::endl; + ThreadPool executor(5); + for (int i = 0; i < 10; ++i) { + executor.add([ &numFinished, &start ] { + while (!start.load()) { + std::this_thread::yield(); + } + ++numFinished; + }); + } + std::cerr << "Starting" << std::endl; + start.store(true); + std::cerr << "Finishing" << std::endl; + } + EXPECT_EQ(10, numFinished.load()); +} + +TEST(ThreadPool, AddJobWhileJoining) { + std::atomic<bool> done{false}; + { + ThreadPool executor(1); + executor.add([&executor, &done] { + while (!done.load()) { + std::this_thread::yield(); + } + // Sleep for a second to be sure that we are joining + std::this_thread::sleep_for(std::chrono::seconds(1)); + executor.add([] { + EXPECT_TRUE(false); + }); + }); + done.store(true); + } +} diff --git a/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp b/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp new file mode 100644 index 000000000..14cf77304 --- /dev/null +++ b/src/zstd/contrib/pzstd/utils/test/WorkQueueTest.cpp @@ -0,0 +1,282 @@ +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "utils/Buffer.h" +#include "utils/WorkQueue.h" + +#include <gtest/gtest.h> +#include <iostream> +#include <memory> +#include <mutex> +#include <thread> +#include <vector> + +using namespace pzstd; + +namespace { +struct Popper { + WorkQueue<int>* queue; + int* results; + std::mutex* mutex; + + void operator()() { + int result; + while (queue->pop(result)) { + std::lock_guard<std::mutex> lock(*mutex); + results[result] = result; + } + } +}; +} + +TEST(WorkQueue, SingleThreaded) { + WorkQueue<int> queue; + int result; + + queue.push(5); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + + queue.push(1); + queue.push(2); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + + queue.push(1); + queue.push(2); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + EXPECT_FALSE(queue.pop(result)); + + queue.waitUntilFinished(); +} + +TEST(WorkQueue, SPSC) { + WorkQueue<int> queue; + const int max = 100; + + for (int i = 0; i < 10; ++i) { + queue.push(int{i}); + } + + std::thread thread([ &queue, max ] { + int result; + for (int i = 0;; ++i) { + if (!queue.pop(result)) { + EXPECT_EQ(i, max); + break; + } + EXPECT_EQ(i, result); + } + }); + + std::this_thread::yield(); + for (int i = 10; i < max; ++i) { + queue.push(int{i}); + } + queue.finish(); + + thread.join(); +} + +TEST(WorkQueue, SPMC) { + WorkQueue<int> queue; + std::vector<int> results(50, -1); + std::mutex mutex; + std::vector<std::thread> threads; + for (int i = 0; i < 5; ++i) { + threads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + for (int i = 0; i < 50; ++i) { + queue.push(int{i}); + } + queue.finish(); + + for (auto& thread : threads) { + thread.join(); + } + + for (int i = 0; i < 50; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, MPMC) { + WorkQueue<int> queue; + std::vector<int> results(100, -1); + std::mutex mutex; + std::vector<std::thread> popperThreads; + for (int i = 0; i < 4; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::vector<std::thread> pusherThreads; + for (int i = 0; i < 2; ++i) { + auto min = i * 50; + auto max = (i + 1) * 50; + pusherThreads.emplace_back( + [ &queue, min, max ] { + for (int i = min; i < max; ++i) { + queue.push(int{i}); + } + }); + } + + for (auto& thread : pusherThreads) { + thread.join(); + } + queue.finish(); + + for (auto& thread : popperThreads) { + thread.join(); + } + + for (int i = 0; i < 100; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, BoundedSizeWorks) { + WorkQueue<int> queue(1); + int result; + queue.push(5); + queue.pop(result); + queue.push(5); + queue.pop(result); + queue.push(5); + queue.finish(); + queue.pop(result); + EXPECT_EQ(5, result); +} + +TEST(WorkQueue, BoundedSizePushAfterFinish) { + WorkQueue<int> queue(1); + int result; + queue.push(5); + std::thread pusher([&queue] { + queue.push(6); + }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + +TEST(WorkQueue, SetMaxSize) { + WorkQueue<int> queue(2); + int result; + queue.push(5); + queue.push(6); + queue.setMaxSize(1); + std::thread pusher([&queue] { + queue.push(7); + }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(6, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + +TEST(WorkQueue, BoundedSizeMPMC) { + WorkQueue<int> queue(10); + std::vector<int> results(200, -1); + std::mutex mutex; + std::cerr << "Creating popperThreads" << std::endl; + std::vector<std::thread> popperThreads; + for (int i = 0; i < 4; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::cerr << "Creating pusherThreads" << std::endl; + std::vector<std::thread> pusherThreads; + for (int i = 0; i < 2; ++i) { + auto min = i * 100; + auto max = (i + 1) * 100; + pusherThreads.emplace_back( + [ &queue, min, max ] { + for (int i = min; i < max; ++i) { + queue.push(int{i}); + } + }); + } + + std::cerr << "Joining pusherThreads" << std::endl; + for (auto& thread : pusherThreads) { + thread.join(); + } + std::cerr << "Finishing queue" << std::endl; + queue.finish(); + + std::cerr << "Joining popperThreads" << std::endl; + for (auto& thread : popperThreads) { + thread.join(); + } + + std::cerr << "Inspecting results" << std::endl; + for (int i = 0; i < 200; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, FailedPush) { + WorkQueue<std::unique_ptr<int>> queue; + std::unique_ptr<int> x(new int{5}); + EXPECT_TRUE(queue.push(std::move(x))); + EXPECT_EQ(nullptr, x); + queue.finish(); + x.reset(new int{6}); + EXPECT_FALSE(queue.push(std::move(x))); + EXPECT_NE(nullptr, x); + EXPECT_EQ(6, *x); +} + +TEST(BufferWorkQueue, SizeCalculatedCorrectly) { + { + BufferWorkQueue queue; + queue.finish(); + EXPECT_EQ(0, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.finish(); + EXPECT_EQ(10, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.push(Buffer(5)); + queue.finish(); + EXPECT_EQ(15, queue.size()); + } + { + BufferWorkQueue queue; + queue.push(Buffer(10)); + queue.push(Buffer(5)); + queue.finish(); + Buffer buffer; + queue.pop(buffer); + EXPECT_EQ(5, queue.size()); + } +} |