diff options
Diffstat (limited to 'src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h')
-rw-r--r-- | src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h b/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h new file mode 100644 index 000000000..bb324fb0a --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h @@ -0,0 +1,318 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <condition_variable> +#include <mutex> + +#include <folly/hash/Hash.h> +#include <folly/Indestructible.h> +#include <folly/Unit.h> + +namespace folly { + +namespace parking_lot_detail { + +struct WaitNodeBase { + const uint64_t key_; + const uint64_t lotid_; + WaitNodeBase* next_{nullptr}; + WaitNodeBase* prev_{nullptr}; + + // tricky: hold both bucket and node mutex to write, either to read + bool signaled_; + std::mutex mutex_; + std::condition_variable cond_; + + WaitNodeBase(uint64_t key, uint64_t lotid) + : key_(key), lotid_(lotid), signaled_(false) {} + + template <typename Clock, typename Duration> + std::cv_status wait(std::chrono::time_point<Clock, Duration> deadline) { + std::cv_status status = std::cv_status::no_timeout; + std::unique_lock<std::mutex> nodeLock(mutex_); + while (!signaled_ && status != std::cv_status::timeout) { + if (deadline != std::chrono::time_point<Clock, Duration>::max()) { + status = cond_.wait_until(nodeLock, deadline); + } else { + cond_.wait(nodeLock); + } + } + return status; + } + + void wake() { + std::lock_guard<std::mutex> nodeLock(mutex_); + signaled_ = true; + cond_.notify_one(); + } + + bool signaled() { + return signaled_; + } +}; + +extern std::atomic<uint64_t> idallocator; + +// Our emulated futex uses 4096 lists of wait nodes. There are two levels +// of locking: a per-list mutex that controls access to the list and a +// per-node mutex, condvar, and bool that are used for the actual wakeups. +// The per-node mutex allows us to do precise wakeups without thundering +// herds. +struct Bucket { + std::mutex mutex_; + WaitNodeBase* head_; + WaitNodeBase* tail_; + std::atomic<uint64_t> count_; + + static Bucket& bucketFor(uint64_t key); + + void push_back(WaitNodeBase* node) { + if (tail_) { + assert(head_); + node->prev_ = tail_; + tail_->next_ = node; + tail_ = node; + } else { + tail_ = node; + head_ = node; + } + } + + void erase(WaitNodeBase* node) { + assert(count_.load(std::memory_order_relaxed) >= 1); + if (head_ == node && tail_ == node) { + assert(node->prev_ == nullptr); + assert(node->next_ == nullptr); + head_ = nullptr; + tail_ = nullptr; + } else if (head_ == node) { + assert(node->prev_ == nullptr); + assert(node->next_); + head_ = node->next_; + head_->prev_ = nullptr; + } else if (tail_ == node) { + assert(node->next_ == nullptr); + assert(node->prev_); + tail_ = node->prev_; + tail_->next_ = nullptr; + } else { + assert(node->next_); + assert(node->prev_); + node->next_->prev_ = node->prev_; + node->prev_->next_ = node->next_; + } + count_.fetch_sub(1, std::memory_order_relaxed); + } +}; + +} // namespace parking_lot_detail + +enum class UnparkControl { + RetainContinue, + RemoveContinue, + RetainBreak, + RemoveBreak, +}; + +enum class ParkResult { + Skip, + Unpark, + Timeout, +}; + +/* + * ParkingLot provides an interface that is similar to Linux's futex + * system call, but with additional functionality. It is implemented + * in a portable way on top of std::mutex and std::condition_variable. + * + * Additional reading: + * https://webkit.org/blog/6161/locking-in-webkit/ + * https://github.com/WebKit/webkit/blob/master/Source/WTF/wtf/ParkingLot.h + * https://locklessinc.com/articles/futex_cheat_sheet/ + * + * The main difference from futex is that park/unpark take lambdas, + * such that nearly anything can be done while holding the bucket + * lock. Unpark() lambda can also be used to wake up any number of + * waiters. + * + * ParkingLot is templated on the data type, however, all ParkingLot + * implementations are backed by a single static array of buckets to + * avoid large memory overhead. Lambdas will only ever be called on + * the specific ParkingLot's nodes. + */ +template <typename Data = Unit> +class ParkingLot { + const uint64_t lotid_; + ParkingLot(const ParkingLot&) = delete; + + struct WaitNode : public parking_lot_detail::WaitNodeBase { + const Data data_; + + template <typename D> + WaitNode(uint64_t key, uint64_t lotid, D&& data) + : WaitNodeBase(key, lotid), data_(std::forward<D>(data)) {} + }; + + public: + ParkingLot() : lotid_(parking_lot_detail::idallocator++) {} + + /* Park API + * + * Key is almost always the address of a variable. + * + * ToPark runs while holding the bucket lock: usually this + * is a check to see if we can sleep, by checking waiter bits. + * + * PreWait is usually used to implement condition variable like + * things, such that you can unlock the condition variable's lock at + * the appropriate time. + */ + template <typename Key, typename D, typename ToPark, typename PreWait> + ParkResult park(const Key key, D&& data, ToPark&& toPark, PreWait&& preWait) { + return park_until( + key, + std::forward<D>(data), + std::forward<ToPark>(toPark), + std::forward<PreWait>(preWait), + std::chrono::steady_clock::time_point::max()); + } + + template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Clock, + typename Duration> + ParkResult park_until( + const Key key, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::time_point<Clock, Duration> deadline); + + template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Rep, + typename Period> + ParkResult park_for( + const Key key, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::duration<Rep, Period>& timeout) { + return park_until( + key, + std::forward<D>(data), + std::forward<ToPark>(toPark), + std::forward<PreWait>(preWait), + timeout + std::chrono::steady_clock::now()); + } + + /* + * Unpark API + * + * Key is the same uniqueaddress used in park(), and is used as a + * hash key for lookup of waiters. + * + * Unparker is a function that is given the Data parameter, and + * returns an UnparkControl. The Remove* results will remove and + * wake the waiter, the Ignore/Stop results will not, while stopping + * or continuing iteration of the waiter list. + */ + template <typename Key, typename Unparker> + void unpark(const Key key, Unparker&& func); +}; + +template <typename Data> +template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Clock, + typename Duration> +ParkResult ParkingLot<Data>::park_until( + const Key bits, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::time_point<Clock, Duration> deadline) { + auto key = hash::twang_mix64(uint64_t(bits)); + auto& bucket = parking_lot_detail::Bucket::bucketFor(key); + WaitNode node(key, lotid_, std::forward<D>(data)); + + { + // A: Must be seq_cst. Matches B. + bucket.count_.fetch_add(1, std::memory_order_seq_cst); + + std::unique_lock<std::mutex> bucketLock(bucket.mutex_); + + if (!std::forward<ToPark>(toPark)()) { + bucketLock.unlock(); + bucket.count_.fetch_sub(1, std::memory_order_relaxed); + return ParkResult::Skip; + } + + bucket.push_back(&node); + } // bucketLock scope + + std::forward<PreWait>(preWait)(); + + auto status = node.wait(deadline); + + if (status == std::cv_status::timeout) { + // it's not really a timeout until we unlink the unsignaled node + std::lock_guard<std::mutex> bucketLock(bucket.mutex_); + if (!node.signaled()) { + bucket.erase(&node); + return ParkResult::Timeout; + } + } + + return ParkResult::Unpark; +} + +template <typename Data> +template <typename Key, typename Func> +void ParkingLot<Data>::unpark(const Key bits, Func&& func) { + auto key = hash::twang_mix64(uint64_t(bits)); + auto& bucket = parking_lot_detail::Bucket::bucketFor(key); + // B: Must be seq_cst. Matches A. If true, A *must* see in seq_cst + // order any atomic updates in toPark() (and matching updates that + // happen before unpark is called) + if (bucket.count_.load(std::memory_order_seq_cst) == 0) { + return; + } + + std::lock_guard<std::mutex> bucketLock(bucket.mutex_); + + for (auto iter = bucket.head_; iter != nullptr;) { + auto node = static_cast<WaitNode*>(iter); + iter = iter->next_; + if (node->key_ == key && node->lotid_ == lotid_) { + auto result = std::forward<Func>(func)(node->data_); + if (result == UnparkControl::RemoveBreak || + result == UnparkControl::RemoveContinue) { + // we unlink, but waiter destroys the node + bucket.erase(node); + + node->wake(); + } + if (result == UnparkControl::RemoveBreak || + result == UnparkControl::RetainBreak) { + return; + } + } + } +} + +} // namespace folly |