diff options
Diffstat (limited to 'src/rocksdb/third-party/folly')
42 files changed, 7331 insertions, 0 deletions
diff --git a/src/rocksdb/third-party/folly/folly/CPortability.h b/src/rocksdb/third-party/folly/folly/CPortability.h new file mode 100644 index 000000000..56cb6b1a5 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/CPortability.h @@ -0,0 +1,27 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +/** + * Macro for marking functions as having public visibility. + */ +#if defined(__GNUC__) +#define FOLLY_EXPORT __attribute__((__visibility__("default"))) +#else +#define FOLLY_EXPORT +#endif + +#if defined(__has_feature) +#define FOLLY_HAS_FEATURE(...) __has_feature(__VA_ARGS__) +#else +#define FOLLY_HAS_FEATURE(...) 0 +#endif + +#if FOLLY_HAS_FEATURE(thread_sanitizer) || __SANITIZE_THREAD__ +#ifndef FOLLY_SANITIZE_THREAD +#define FOLLY_SANITIZE_THREAD 1 +#endif +#endif diff --git a/src/rocksdb/third-party/folly/folly/ConstexprMath.h b/src/rocksdb/third-party/folly/folly/ConstexprMath.h new file mode 100644 index 000000000..f09167e0d --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/ConstexprMath.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +namespace folly { +template <typename T> +constexpr T constexpr_max(T a) { + return a; +} +template <typename T, typename... Ts> +constexpr T constexpr_max(T a, T b, Ts... ts) { + return b < a ? constexpr_max(a, ts...) : constexpr_max(b, ts...); +} + +namespace detail { +template <typename T> +constexpr T constexpr_log2_(T a, T e) { + return e == T(1) ? a : constexpr_log2_(a + T(1), e / T(2)); +} + +template <typename T> +constexpr T constexpr_log2_ceil_(T l2, T t) { + return l2 + T(T(1) << l2 < t ? 1 : 0); +} + +template <typename T> +constexpr T constexpr_square_(T t) { + return t * t; +} +} // namespace detail + +template <typename T> +constexpr T constexpr_log2(T t) { + return detail::constexpr_log2_(T(0), t); +} + +template <typename T> +constexpr T constexpr_log2_ceil(T t) { + return detail::constexpr_log2_ceil_(constexpr_log2(t), t); +} + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/Indestructible.h b/src/rocksdb/third-party/folly/folly/Indestructible.h new file mode 100644 index 000000000..68249d865 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/Indestructible.h @@ -0,0 +1,166 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <cassert> +#include <type_traits> +#include <utility> + +#include <folly/Traits.h> + +namespace folly { + +/*** + * Indestructible + * + * When you need a Meyers singleton that will not get destructed, even at + * shutdown, and you also want the object stored inline. + * + * Use like: + * + * void doSomethingWithExpensiveData(); + * + * void doSomethingWithExpensiveData() { + * static const Indestructible<map<string, int>> data{ + * map<string, int>{{"key1", 17}, {"key2", 19}, {"key3", 23}}, + * }; + * callSomethingTakingAMapByRef(*data); + * } + * + * This should be used only for Meyers singletons, and, even then, only when + * the instance does not need to be destructed ever. + * + * This should not be used more generally, e.g., as member fields, etc. + * + * This is designed as an alternative, but with one fewer allocation at + * construction time and one fewer pointer dereference at access time, to the + * Meyers singleton pattern of: + * + * void doSomethingWithExpensiveData() { + * static const auto data = // never `delete`d + * new map<string, int>{{"key1", 17}, {"key2", 19}, {"key3", 23}}; + * callSomethingTakingAMapByRef(*data); + * } + */ + +template <typename T> +class Indestructible final { + public: + template <typename S = T, typename = decltype(S())> + constexpr Indestructible() noexcept(noexcept(T())) {} + + /** + * Constructor accepting a single argument by forwarding reference, this + * allows using list initialzation without the overhead of things like + * in_place, etc and also works with std::initializer_list constructors + * which can't be deduced, the default parameter helps there. + * + * auto i = folly::Indestructible<std::map<int, int>>{{{1, 2}}}; + * + * This provides convenience + * + * There are two versions of this constructor - one for when the element is + * implicitly constructible from the given argument and one for when the + * type is explicitly but not implicitly constructible from the given + * argument. + */ + template < + typename U = T, + _t<std::enable_if<std::is_constructible<T, U&&>::value>>* = nullptr, + _t<std::enable_if< + !std::is_same<Indestructible<T>, remove_cvref_t<U>>::value>>* = + nullptr, + _t<std::enable_if<!std::is_convertible<U&&, T>::value>>* = nullptr> + explicit constexpr Indestructible(U&& u) noexcept( + noexcept(T(std::declval<U>()))) + : storage_(std::forward<U>(u)) {} + template < + typename U = T, + _t<std::enable_if<std::is_constructible<T, U&&>::value>>* = nullptr, + _t<std::enable_if< + !std::is_same<Indestructible<T>, remove_cvref_t<U>>::value>>* = + nullptr, + _t<std::enable_if<std::is_convertible<U&&, T>::value>>* = nullptr> + /* implicit */ constexpr Indestructible(U&& u) noexcept( + noexcept(T(std::declval<U>()))) + : storage_(std::forward<U>(u)) {} + + template <typename... Args, typename = decltype(T(std::declval<Args>()...))> + explicit constexpr Indestructible(Args&&... args) noexcept( + noexcept(T(std::declval<Args>()...))) + : storage_(std::forward<Args>(args)...) {} + template < + typename U, + typename... Args, + typename = decltype( + T(std::declval<std::initializer_list<U>&>(), + std::declval<Args>()...))> + explicit constexpr Indestructible(std::initializer_list<U> il, Args... args) noexcept( + noexcept( + T(std::declval<std::initializer_list<U>&>(), + std::declval<Args>()...))) + : storage_(il, std::forward<Args>(args)...) {} + + ~Indestructible() = default; + + Indestructible(Indestructible const&) = delete; + Indestructible& operator=(Indestructible const&) = delete; + + Indestructible(Indestructible&& other) noexcept( + noexcept(T(std::declval<T>()))) + : storage_(std::move(other.storage_.value)) { + other.erased_ = true; + } + Indestructible& operator=(Indestructible&& other) noexcept( + noexcept(T(std::declval<T>()))) { + storage_.value = std::move(other.storage_.value); + other.erased_ = true; + } + + T* get() noexcept { + check(); + return &storage_.value; + } + T const* get() const noexcept { + check(); + return &storage_.value; + } + T& operator*() noexcept { + return *get(); + } + T const& operator*() const noexcept { + return *get(); + } + T* operator->() noexcept { + return get(); + } + T const* operator->() const noexcept { + return get(); + } + + private: + void check() const noexcept { + assert(!erased_); + } + + union Storage { + T value; + + template <typename S = T, typename = decltype(S())> + constexpr Storage() noexcept(noexcept(T())) : value() {} + + template <typename... Args, typename = decltype(T(std::declval<Args>()...))> + explicit constexpr Storage(Args&&... args) noexcept( + noexcept(T(std::declval<Args>()...))) + : value(std::forward<Args>(args)...) {} + + ~Storage() {} + }; + + Storage storage_{}; + bool erased_{false}; +}; +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/Optional.h b/src/rocksdb/third-party/folly/folly/Optional.h new file mode 100644 index 000000000..ee12467dd --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/Optional.h @@ -0,0 +1,570 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +/* + * Optional - For conditional initialization of values, like boost::optional, + * but with support for move semantics and emplacement. Reference type support + * has not been included due to limited use cases and potential confusion with + * semantics of assignment: Assigning to an optional reference could quite + * reasonably copy its value or redirect the reference. + * + * Optional can be useful when a variable might or might not be needed: + * + * Optional<Logger> maybeLogger = ...; + * if (maybeLogger) { + * maybeLogger->log("hello"); + * } + * + * Optional enables a 'null' value for types which do not otherwise have + * nullability, especially useful for parameter passing: + * + * void testIterator(const unique_ptr<Iterator>& it, + * initializer_list<int> idsExpected, + * Optional<initializer_list<int>> ranksExpected = none) { + * for (int i = 0; it->next(); ++i) { + * EXPECT_EQ(it->doc().id(), idsExpected[i]); + * if (ranksExpected) { + * EXPECT_EQ(it->doc().rank(), (*ranksExpected)[i]); + * } + * } + * } + * + * Optional models OptionalPointee, so calling 'get_pointer(opt)' will return a + * pointer to nullptr if the 'opt' is empty, and a pointer to the value if it is + * not: + * + * Optional<int> maybeInt = ...; + * if (int* v = get_pointer(maybeInt)) { + * cout << *v << endl; + * } + */ + +#include <cstddef> +#include <functional> +#include <new> +#include <stdexcept> +#include <type_traits> +#include <utility> + +#include <folly/CPortability.h> +#include <folly/Traits.h> +#include <folly/Utility.h> + +namespace folly { + +template <class Value> +class Optional; + +namespace detail { +template <class Value> +struct OptionalPromiseReturn; +} // namespace detail + +struct None { + enum class _secret { _token }; + + /** + * No default constructor to support both `op = {}` and `op = none` + * as syntax for clearing an Optional, just like std::nullopt_t. + */ + constexpr explicit None(_secret) {} +}; +constexpr None none{None::_secret::_token}; + +class FOLLY_EXPORT OptionalEmptyException : public std::runtime_error { + public: + OptionalEmptyException() + : std::runtime_error("Empty Optional cannot be unwrapped") {} +}; + +template <class Value> +class Optional { + public: + typedef Value value_type; + + static_assert( + !std::is_reference<Value>::value, + "Optional may not be used with reference types"); + static_assert( + !std::is_abstract<Value>::value, + "Optional may not be used with abstract types"); + + Optional() noexcept {} + + Optional(const Optional& src) noexcept( + std::is_nothrow_copy_constructible<Value>::value) { + if (src.hasValue()) { + construct(src.value()); + } + } + + Optional(Optional&& src) noexcept( + std::is_nothrow_move_constructible<Value>::value) { + if (src.hasValue()) { + construct(std::move(src.value())); + src.clear(); + } + } + + /* implicit */ Optional(const None&) noexcept {} + + /* implicit */ Optional(Value&& newValue) noexcept( + std::is_nothrow_move_constructible<Value>::value) { + construct(std::move(newValue)); + } + + /* implicit */ Optional(const Value& newValue) noexcept( + std::is_nothrow_copy_constructible<Value>::value) { + construct(newValue); + } + + template <typename... Args> + explicit Optional(in_place_t, Args&&... args) noexcept( + std::is_nothrow_constructible<Value, Args...>::value) + : Optional{PrivateConstructor{}, std::forward<Args>(args)...} {} + + template <typename U, typename... Args> + explicit Optional( + in_place_t, + std::initializer_list<U> il, + Args&&... args) noexcept(std:: + is_nothrow_constructible< + Value, + std::initializer_list<U>, + Args...>::value) + : Optional{PrivateConstructor{}, il, std::forward<Args>(args)...} {} + + // Used only when an Optional is used with coroutines on MSVC + /* implicit */ Optional(const detail::OptionalPromiseReturn<Value>& p) + : Optional{} { + p.promise_->value_ = this; + } + + void assign(const None&) { + clear(); + } + + void assign(Optional&& src) { + if (this != &src) { + if (src.hasValue()) { + assign(std::move(src.value())); + src.clear(); + } else { + clear(); + } + } + } + + void assign(const Optional& src) { + if (src.hasValue()) { + assign(src.value()); + } else { + clear(); + } + } + + void assign(Value&& newValue) { + if (hasValue()) { + storage_.value = std::move(newValue); + } else { + construct(std::move(newValue)); + } + } + + void assign(const Value& newValue) { + if (hasValue()) { + storage_.value = newValue; + } else { + construct(newValue); + } + } + + Optional& operator=(None) noexcept { + reset(); + return *this; + } + + template <class Arg> + Optional& operator=(Arg&& arg) { + assign(std::forward<Arg>(arg)); + return *this; + } + + Optional& operator=(Optional&& other) noexcept( + std::is_nothrow_move_assignable<Value>::value) { + assign(std::move(other)); + return *this; + } + + Optional& operator=(const Optional& other) noexcept( + std::is_nothrow_copy_assignable<Value>::value) { + assign(other); + return *this; + } + + template <class... Args> + Value& emplace(Args&&... args) { + clear(); + construct(std::forward<Args>(args)...); + return value(); + } + + template <class U, class... Args> + typename std::enable_if< + std::is_constructible<Value, std::initializer_list<U>&, Args&&...>::value, + Value&>::type + emplace(std::initializer_list<U> ilist, Args&&... args) { + clear(); + construct(ilist, std::forward<Args>(args)...); + return value(); + } + + void reset() noexcept { + storage_.clear(); + } + + void clear() noexcept { + reset(); + } + + void swap(Optional& that) noexcept(IsNothrowSwappable<Value>::value) { + if (hasValue() && that.hasValue()) { + using std::swap; + swap(value(), that.value()); + } else if (hasValue()) { + that.emplace(std::move(value())); + reset(); + } else if (that.hasValue()) { + emplace(std::move(that.value())); + that.reset(); + } + } + + const Value& value() const& { + require_value(); + return storage_.value; + } + + Value& value() & { + require_value(); + return storage_.value; + } + + Value&& value() && { + require_value(); + return std::move(storage_.value); + } + + const Value&& value() const&& { + require_value(); + return std::move(storage_.value); + } + + const Value* get_pointer() const& { + return storage_.hasValue ? &storage_.value : nullptr; + } + Value* get_pointer() & { + return storage_.hasValue ? &storage_.value : nullptr; + } + Value* get_pointer() && = delete; + + bool has_value() const noexcept { + return storage_.hasValue; + } + + bool hasValue() const noexcept { + return has_value(); + } + + explicit operator bool() const noexcept { + return has_value(); + } + + const Value& operator*() const& { + return value(); + } + Value& operator*() & { + return value(); + } + const Value&& operator*() const&& { + return std::move(value()); + } + Value&& operator*() && { + return std::move(value()); + } + + const Value* operator->() const { + return &value(); + } + Value* operator->() { + return &value(); + } + + // Return a copy of the value if set, or a given default if not. + template <class U> + Value value_or(U&& dflt) const& { + if (storage_.hasValue) { + return storage_.value; + } + + return std::forward<U>(dflt); + } + + template <class U> + Value value_or(U&& dflt) && { + if (storage_.hasValue) { + return std::move(storage_.value); + } + + return std::forward<U>(dflt); + } + + private: + template <class T> + friend Optional<_t<std::decay<T>>> make_optional(T&&); + template <class T, class... Args> + friend Optional<T> make_optional(Args&&... args); + template <class T, class U, class... As> + friend Optional<T> make_optional(std::initializer_list<U>, As&&...); + + /** + * Construct the optional in place, this is duplicated as a non-explicit + * constructor to allow returning values that are non-movable from + * make_optional using list initialization. + * + * Until C++17, at which point this will become unnecessary because of + * specified prvalue elision. + */ + struct PrivateConstructor { + explicit PrivateConstructor() = default; + }; + template <typename... Args> + Optional(PrivateConstructor, Args&&... args) noexcept( + std::is_constructible<Value, Args&&...>::value) { + construct(std::forward<Args>(args)...); + } + + void require_value() const { + if (!storage_.hasValue) { + throw OptionalEmptyException{}; + } + } + + template <class... Args> + void construct(Args&&... args) { + const void* ptr = &storage_.value; + // For supporting const types. + new (const_cast<void*>(ptr)) Value(std::forward<Args>(args)...); + storage_.hasValue = true; + } + + struct StorageTriviallyDestructible { + union { + char emptyState; + Value value; + }; + bool hasValue; + + StorageTriviallyDestructible() + : emptyState('\0'), hasValue{false} {} + void clear() { + hasValue = false; + } + }; + + struct StorageNonTriviallyDestructible { + union { + char emptyState; + Value value; + }; + bool hasValue; + + StorageNonTriviallyDestructible() : hasValue{false} {} + ~StorageNonTriviallyDestructible() { + clear(); + } + + void clear() { + if (hasValue) { + hasValue = false; + value.~Value(); + } + } + }; + + using Storage = typename std::conditional< + std::is_trivially_destructible<Value>::value, + StorageTriviallyDestructible, + StorageNonTriviallyDestructible>::type; + + Storage storage_; +}; + +template <class T> +const T* get_pointer(const Optional<T>& opt) { + return opt.get_pointer(); +} + +template <class T> +T* get_pointer(Optional<T>& opt) { + return opt.get_pointer(); +} + +template <class T> +void swap(Optional<T>& a, Optional<T>& b) noexcept(noexcept(a.swap(b))) { + a.swap(b); +} + +template <class T> +Optional<_t<std::decay<T>>> make_optional(T&& v) { + using PrivateConstructor = + typename folly::Optional<_t<std::decay<T>>>::PrivateConstructor; + return {PrivateConstructor{}, std::forward<T>(v)}; +} + +template <class T, class... Args> +folly::Optional<T> make_optional(Args&&... args) { + using PrivateConstructor = typename folly::Optional<T>::PrivateConstructor; + return {PrivateConstructor{}, std::forward<Args>(args)...}; +} + +template <class T, class U, class... Args> +folly::Optional<T> make_optional( + std::initializer_list<U> il, + Args&&... args) { + using PrivateConstructor = typename folly::Optional<T>::PrivateConstructor; + return {PrivateConstructor{}, il, std::forward<Args>(args)...}; +} + +/////////////////////////////////////////////////////////////////////////////// +// Comparisons. + +template <class U, class V> +bool operator==(const Optional<U>& a, const V& b) { + return a.hasValue() && a.value() == b; +} + +template <class U, class V> +bool operator!=(const Optional<U>& a, const V& b) { + return !(a == b); +} + +template <class U, class V> +bool operator==(const U& a, const Optional<V>& b) { + return b.hasValue() && b.value() == a; +} + +template <class U, class V> +bool operator!=(const U& a, const Optional<V>& b) { + return !(a == b); +} + +template <class U, class V> +bool operator==(const Optional<U>& a, const Optional<V>& b) { + if (a.hasValue() != b.hasValue()) { + return false; + } + if (a.hasValue()) { + return a.value() == b.value(); + } + return true; +} + +template <class U, class V> +bool operator!=(const Optional<U>& a, const Optional<V>& b) { + return !(a == b); +} + +template <class U, class V> +bool operator<(const Optional<U>& a, const Optional<V>& b) { + if (a.hasValue() != b.hasValue()) { + return a.hasValue() < b.hasValue(); + } + if (a.hasValue()) { + return a.value() < b.value(); + } + return false; +} + +template <class U, class V> +bool operator>(const Optional<U>& a, const Optional<V>& b) { + return b < a; +} + +template <class U, class V> +bool operator<=(const Optional<U>& a, const Optional<V>& b) { + return !(b < a); +} + +template <class U, class V> +bool operator>=(const Optional<U>& a, const Optional<V>& b) { + return !(a < b); +} + +// Suppress comparability of Optional<T> with T, despite implicit conversion. +template <class V> +bool operator<(const Optional<V>&, const V& other) = delete; +template <class V> +bool operator<=(const Optional<V>&, const V& other) = delete; +template <class V> +bool operator>=(const Optional<V>&, const V& other) = delete; +template <class V> +bool operator>(const Optional<V>&, const V& other) = delete; +template <class V> +bool operator<(const V& other, const Optional<V>&) = delete; +template <class V> +bool operator<=(const V& other, const Optional<V>&) = delete; +template <class V> +bool operator>=(const V& other, const Optional<V>&) = delete; +template <class V> +bool operator>(const V& other, const Optional<V>&) = delete; + +// Comparisons with none +template <class V> +bool operator==(const Optional<V>& a, None) noexcept { + return !a.hasValue(); +} +template <class V> +bool operator==(None, const Optional<V>& a) noexcept { + return !a.hasValue(); +} +template <class V> +bool operator<(const Optional<V>&, None) noexcept { + return false; +} +template <class V> +bool operator<(None, const Optional<V>& a) noexcept { + return a.hasValue(); +} +template <class V> +bool operator>(const Optional<V>& a, None) noexcept { + return a.hasValue(); +} +template <class V> +bool operator>(None, const Optional<V>&) noexcept { + return false; +} +template <class V> +bool operator<=(None, const Optional<V>&) noexcept { + return true; +} +template <class V> +bool operator<=(const Optional<V>& a, None) noexcept { + return !a.hasValue(); +} +template <class V> +bool operator>=(const Optional<V>&, None) noexcept { + return true; +} +template <class V> +bool operator>=(None, const Optional<V>& a) noexcept { + return !a.hasValue(); +} + +/////////////////////////////////////////////////////////////////////////////// + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/Portability.h b/src/rocksdb/third-party/folly/folly/Portability.h new file mode 100644 index 000000000..61c05ff22 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/Portability.h @@ -0,0 +1,84 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/CPortability.h> + +#if defined(__arm__) +#define FOLLY_ARM 1 +#else +#define FOLLY_ARM 0 +#endif + +#if defined(__x86_64__) || defined(_M_X64) +#define FOLLY_X64 1 +#else +#define FOLLY_X64 0 +#endif + +#if defined(__aarch64__) +#define FOLLY_AARCH64 1 +#else +#define FOLLY_AARCH64 0 +#endif + +#if defined(__powerpc64__) +#define FOLLY_PPC64 1 +#else +#define FOLLY_PPC64 0 +#endif + +#if defined(__has_builtin) +#define FOLLY_HAS_BUILTIN(...) __has_builtin(__VA_ARGS__) +#else +#define FOLLY_HAS_BUILTIN(...) 0 +#endif + +#if defined(__has_cpp_attribute) +#if __has_cpp_attribute(nodiscard) +#define FOLLY_NODISCARD [[nodiscard]] +#endif +#endif +#if !defined FOLLY_NODISCARD +#if defined(_MSC_VER) && (_MSC_VER >= 1700) +#define FOLLY_NODISCARD _Check_return_ +#elif defined(__GNUC__) +#define FOLLY_NODISCARD __attribute__((__warn_unused_result__)) +#else +#define FOLLY_NODISCARD +#endif +#endif + +namespace folly { +constexpr bool kIsArchArm = FOLLY_ARM == 1; +constexpr bool kIsArchAmd64 = FOLLY_X64 == 1; +constexpr bool kIsArchAArch64 = FOLLY_AARCH64 == 1; +constexpr bool kIsArchPPC64 = FOLLY_PPC64 == 1; +} // namespace folly + +namespace folly { +#ifdef NDEBUG +constexpr auto kIsDebug = false; +#else +constexpr auto kIsDebug = true; +#endif +} // namespace folly + +namespace folly { +#if defined(_MSC_VER) +constexpr bool kIsMsvc = true; +#else +constexpr bool kIsMsvc = false; +#endif +} // namespace folly + +namespace folly { +#if FOLLY_SANITIZE_THREAD +constexpr bool kIsSanitizeThread = true; +#else +constexpr bool kIsSanitizeThread = false; +#endif +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/ScopeGuard.h b/src/rocksdb/third-party/folly/folly/ScopeGuard.h new file mode 100644 index 000000000..711344063 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/ScopeGuard.h @@ -0,0 +1,54 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Traits.h> + +#include <utility> +#include <type_traits> + +namespace folly { +namespace scope_guard_detail { +template <typename F> +class ScopeGuardImpl { + public: + explicit ScopeGuardImpl(F&& f) : f_{std::forward<F>(f)} {} + ~ScopeGuardImpl() { + f_(); + } + + private: + F f_; +}; + +enum class ScopeGuardEnum {}; +template <typename Func, typename DecayedFunc = _t<std::decay<Func>>> +ScopeGuardImpl<DecayedFunc> operator+(ScopeGuardEnum, Func&& func) { + return ScopeGuardImpl<DecayedFunc>{std::forward<Func>(func)}; +} +} // namespace scope_guard_detail +} // namespace folly + +/** + * FB_ANONYMOUS_VARIABLE(str) introduces an identifier starting with + * str and ending with a number that varies with the line. + */ +#ifndef FB_ANONYMOUS_VARIABLE +#define FB_CONCATENATE_IMPL(s1, s2) s1##s2 +#define FB_CONCATENATE(s1, s2) FB_CONCATENATE_IMPL(s1, s2) +#ifdef __COUNTER__ +#define FB_ANONYMOUS_VARIABLE(str) \ + FB_CONCATENATE(FB_CONCATENATE(FB_CONCATENATE(str, __COUNTER__), _), __LINE__) +#else +#define FB_ANONYMOUS_VARIABLE(str) FB_CONCATENATE(str, __LINE__) +#endif +#endif + +#ifndef SCOPE_EXIT +#define SCOPE_EXIT \ + auto FB_ANONYMOUS_VARIABLE(SCOPE_EXIT_STATE) = \ + ::folly::scope_guard_detail::ScopeGuardEnum{} + [&]() noexcept +#endif diff --git a/src/rocksdb/third-party/folly/folly/Traits.h b/src/rocksdb/third-party/folly/folly/Traits.h new file mode 100644 index 000000000..ea7e1eb1c --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/Traits.h @@ -0,0 +1,152 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <type_traits> +#include <utility> + +namespace folly { + +#if !defined(_MSC_VER) +template <class T> +struct is_trivially_copyable + : std::integral_constant<bool, __has_trivial_copy(T)> {}; +#else +template <class T> +using is_trivially_copyable = std::is_trivially_copyable<T>; +#endif + +/*** + * _t + * + * Instead of: + * + * using decayed = typename std::decay<T>::type; + * + * With the C++14 standard trait aliases, we could use: + * + * using decayed = std::decay_t<T>; + * + * Without them, we could use: + * + * using decayed = _t<std::decay<T>>; + * + * Also useful for any other library with template types having dependent + * member types named `type`, like the standard trait types. + */ +template <typename T> +using _t = typename T::type; + +/** + * type_t + * + * A type alias for the first template type argument. `type_t` is useful for + * controlling class-template and function-template partial specialization. + * + * Example: + * + * template <typename Value> + * class Container { + * public: + * template <typename... Args> + * Container( + * type_t<in_place_t, decltype(Value(std::declval<Args>()...))>, + * Args&&...); + * }; + * + * void_t + * + * A type alias for `void`. `void_t` is useful for controling class-template + * and function-template partial specialization. + * + * Example: + * + * // has_value_type<T>::value is true if T has a nested type `value_type` + * template <class T, class = void> + * struct has_value_type + * : std::false_type {}; + * + * template <class T> + * struct has_value_type<T, folly::void_t<typename T::value_type>> + * : std::true_type {}; + */ + +/** + * There is a bug in libstdc++, libc++, and MSVC's STL that causes it to + * ignore unused template parameter arguments in template aliases and does not + * cause substitution failures. This defect has been recorded here: + * http://open-std.org/JTC1/SC22/WG21/docs/cwg_defects.html#1558. + * + * This causes the implementation of std::void_t to be buggy, as it is likely + * defined as something like the following: + * + * template <typename...> + * using void_t = void; + * + * This causes the compiler to ignore all the template arguments and does not + * help when one wants to cause substitution failures. Rather declarations + * which have void_t in orthogonal specializations are treated as the same. + * For example, assuming the possible `T` types are only allowed to have + * either the alias `one` or `two` and never both or none: + * + * template <typename T, + * typename std::void_t<std::decay_t<T>::one>* = nullptr> + * void foo(T&&) {} + * template <typename T, + * typename std::void_t<std::decay_t<T>::two>* = nullptr> + * void foo(T&&) {} + * + * The second foo() will be a redefinition because it conflicts with the first + * one; void_t does not cause substitution failures - the template types are + * just ignored. + */ + +namespace traits_detail { +template <class T, class...> +struct type_t_ { + using type = T; +}; +} // namespace traits_detail + +template <class T, class... Ts> +using type_t = typename traits_detail::type_t_<T, Ts...>::type; +template <class... Ts> +using void_t = type_t<void, Ts...>; + +/** + * A type trait to remove all const volatile and reference qualifiers on a + * type T + */ +template <typename T> +struct remove_cvref { + using type = + typename std::remove_cv<typename std::remove_reference<T>::type>::type; +}; +template <typename T> +using remove_cvref_t = typename remove_cvref<T>::type; + +template <class T> +struct IsNothrowSwappable + : std::integral_constant< + bool, + std::is_nothrow_move_constructible<T>::value&& noexcept( + std::swap(std::declval<T&>(), std::declval<T&>()))> {}; + +template <typename...> +struct Conjunction : std::true_type {}; +template <typename T> +struct Conjunction<T> : T {}; +template <typename T, typename... TList> +struct Conjunction<T, TList...> + : std::conditional<T::value, Conjunction<TList...>, T>::type {}; + +template <typename T> +struct Negation : std::integral_constant<bool, !T::value> {}; + +template <std::size_t I> +using index_constant = std::integral_constant<std::size_t, I>; + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/Unit.h b/src/rocksdb/third-party/folly/folly/Unit.h new file mode 100644 index 000000000..c8cb77e2c --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/Unit.h @@ -0,0 +1,59 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <type_traits> + +namespace folly { + +/// In functional programming, the degenerate case is often called "unit". In +/// C++, "void" is often the best analogue. However, because of the syntactic +/// special-casing required for void, it is frequently a liability for template +/// metaprogramming. So, instead of writing specializations to handle cases like +/// SomeContainer<void>, a library author may instead rule that out and simply +/// have library users use SomeContainer<Unit>. Contained values may be ignored. +/// Much easier. +/// +/// "void" is the type that admits of no values at all. It is not possible to +/// construct a value of this type. +/// "unit" is the type that admits of precisely one unique value. It is +/// possible to construct a value of this type, but it is always the same value +/// every time, so it is uninteresting. +struct Unit { + constexpr bool operator==(const Unit& /*other*/) const { + return true; + } + constexpr bool operator!=(const Unit& /*other*/) const { + return false; + } +}; + +constexpr Unit unit{}; + +template <typename T> +struct lift_unit { + using type = T; +}; +template <> +struct lift_unit<void> { + using type = Unit; +}; +template <typename T> +using lift_unit_t = typename lift_unit<T>::type; + +template <typename T> +struct drop_unit { + using type = T; +}; +template <> +struct drop_unit<Unit> { + using type = void; +}; +template <typename T> +using drop_unit_t = typename drop_unit<T>::type; + +} // namespace folly + diff --git a/src/rocksdb/third-party/folly/folly/Utility.h b/src/rocksdb/third-party/folly/folly/Utility.h new file mode 100644 index 000000000..7e43bdc2f --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/Utility.h @@ -0,0 +1,141 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <utility> +#include <type_traits> + +namespace folly { + +/** + * Backports from C++17 of: + * std::in_place_t + * std::in_place_type_t + * std::in_place_index_t + * std::in_place + * std::in_place_type + * std::in_place_index + */ + +struct in_place_tag {}; +template <class> +struct in_place_type_tag {}; +template <std::size_t> +struct in_place_index_tag {}; + +using in_place_t = in_place_tag (&)(in_place_tag); +template <class T> +using in_place_type_t = in_place_type_tag<T> (&)(in_place_type_tag<T>); +template <std::size_t I> +using in_place_index_t = in_place_index_tag<I> (&)(in_place_index_tag<I>); + +inline in_place_tag in_place(in_place_tag = {}) { + return {}; +} +template <class T> +inline in_place_type_tag<T> in_place_type(in_place_type_tag<T> = {}) { + return {}; +} +template <std::size_t I> +inline in_place_index_tag<I> in_place_index(in_place_index_tag<I> = {}) { + return {}; +} + +template <class T, class U = T> +T exchange(T& obj, U&& new_value) { + T old_value = std::move(obj); + obj = std::forward<U>(new_value); + return old_value; +} + +namespace utility_detail { +template <typename...> +struct make_seq_cat; +template < + template <typename T, T...> class S, + typename T, + T... Ta, + T... Tb, + T... Tc> +struct make_seq_cat<S<T, Ta...>, S<T, Tb...>, S<T, Tc...>> { + using type = + S<T, + Ta..., + (sizeof...(Ta) + Tb)..., + (sizeof...(Ta) + sizeof...(Tb) + Tc)...>; +}; + +// Not parameterizing by `template <typename T, T...> class, typename` because +// clang precisely v4.0 fails to compile that. Note that clang v3.9 and v5.0 +// handle that code correctly. +// +// For this to work, `S0` is required to be `Sequence<T>` and `S1` is required +// to be `Sequence<T, 0>`. + +template <std::size_t Size> +struct make_seq { + template <typename S0, typename S1> + using apply = typename make_seq_cat< + typename make_seq<Size / 2>::template apply<S0, S1>, + typename make_seq<Size / 2>::template apply<S0, S1>, + typename make_seq<Size % 2>::template apply<S0, S1>>::type; +}; +template <> +struct make_seq<1> { + template <typename S0, typename S1> + using apply = S1; +}; +template <> +struct make_seq<0> { + template <typename S0, typename S1> + using apply = S0; +}; +} // namespace utility_detail + +// TODO: Remove after upgrading to C++14 baseline + +template <class T, T... Ints> +struct integer_sequence { + using value_type = T; + + static constexpr std::size_t size() noexcept { + return sizeof...(Ints); + } +}; + +template <std::size_t... Ints> +using index_sequence = integer_sequence<std::size_t, Ints...>; + +template <typename T, std::size_t Size> +using make_integer_sequence = typename utility_detail::make_seq< + Size>::template apply<integer_sequence<T>, integer_sequence<T, 0>>; + +template <std::size_t Size> +using make_index_sequence = make_integer_sequence<std::size_t, Size>; +template <class... T> +using index_sequence_for = make_index_sequence<sizeof...(T)>; + +/** + * A simple helper for getting a constant reference to an object. + * + * Example: + * + * std::vector<int> v{1,2,3}; + * // The following two lines are equivalent: + * auto a = const_cast<const std::vector<int>&>(v).begin(); + * auto b = folly::as_const(v).begin(); + * + * Like C++17's std::as_const. See http://wg21.link/p0007 + */ +template <class T> +T const& as_const(T& t) noexcept { + return t; +} + +template <class T> +void as_const(T const&&) = delete; + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/chrono/Hardware.h b/src/rocksdb/third-party/folly/folly/chrono/Hardware.h new file mode 100644 index 000000000..ec7be82e8 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/chrono/Hardware.h @@ -0,0 +1,33 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Portability.h> + +#include <chrono> +#include <cstdint> + +#if _MSC_VER +extern "C" std::uint64_t __rdtsc(); +#pragma intrinsic(__rdtsc) +#endif + +namespace folly { + +inline std::uint64_t hardware_timestamp() { +#if _MSC_VER + return __rdtsc(); +#elif __GNUC__ && (__i386__ || FOLLY_X64) + return __builtin_ia32_rdtsc(); +#else + // use steady_clock::now() as an approximation for the timestamp counter on + // non-x86 systems + return std::chrono::steady_clock::now().time_since_epoch().count(); +#endif +} + +} // namespace folly + diff --git a/src/rocksdb/third-party/folly/folly/container/Array.h b/src/rocksdb/third-party/folly/folly/container/Array.h new file mode 100644 index 000000000..bb3167b97 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/container/Array.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <array> +#include <type_traits> +#include <utility> + +#include <folly/Traits.h> +#include <folly/Utility.h> + +namespace folly { + +namespace array_detail { +template <typename> +struct is_ref_wrapper : std::false_type {}; +template <typename T> +struct is_ref_wrapper<std::reference_wrapper<T>> : std::true_type {}; + +template <typename T> +using not_ref_wrapper = + folly::Negation<is_ref_wrapper<typename std::decay<T>::type>>; + +template <typename D, typename...> +struct return_type_helper { + using type = D; +}; +template <typename... TList> +struct return_type_helper<void, TList...> { + static_assert( + folly::Conjunction<not_ref_wrapper<TList>...>::value, + "TList cannot contain reference_wrappers when D is void"); + using type = typename std::common_type<TList...>::type; +}; + +template <typename D, typename... TList> +using return_type = std:: + array<typename return_type_helper<D, TList...>::type, sizeof...(TList)>; +} // namespace array_detail + +template <typename D = void, typename... TList> +constexpr array_detail::return_type<D, TList...> make_array(TList&&... t) { + using value_type = + typename array_detail::return_type_helper<D, TList...>::type; + return {{static_cast<value_type>(std::forward<TList>(t))...}}; +} + +namespace array_detail { +template <typename MakeItem, std::size_t... Index> +inline constexpr auto make_array_with( + MakeItem const& make, + folly::index_sequence<Index...>) + -> std::array<decltype(make(0)), sizeof...(Index)> { + return std::array<decltype(make(0)), sizeof...(Index)>{{make(Index)...}}; +} +} // namespace array_detail + +// make_array_with +// +// Constructs a std::array<..., Size> with elements m(i) for i in [0, Size). +template <std::size_t Size, typename MakeItem> +constexpr auto make_array_with(MakeItem const& make) + -> decltype(array_detail::make_array_with( + make, + folly::make_index_sequence<Size>{})) { + return array_detail::make_array_with( + make, + folly::make_index_sequence<Size>{}); +} + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/detail/Futex-inl.h b/src/rocksdb/third-party/folly/folly/detail/Futex-inl.h new file mode 100644 index 000000000..3b2a412bf --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/detail/Futex-inl.h @@ -0,0 +1,117 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/detail/Futex.h> +#include <folly/synchronization/ParkingLot.h> + +namespace folly { +namespace detail { + +/** Optimal when TargetClock is the same type as Clock. + * + * Otherwise, both Clock::now() and TargetClock::now() must be invoked. */ +template <typename TargetClock, typename Clock, typename Duration> +typename TargetClock::time_point time_point_conv( + std::chrono::time_point<Clock, Duration> const& time) { + using std::chrono::duration_cast; + using TimePoint = std::chrono::time_point<Clock, Duration>; + using TargetDuration = typename TargetClock::duration; + using TargetTimePoint = typename TargetClock::time_point; + if (time == TimePoint::max()) { + return TargetTimePoint::max(); + } else if (std::is_same<Clock, TargetClock>::value) { + // in place of time_point_cast, which cannot compile without if-constexpr + auto const delta = time.time_since_epoch(); + return TargetTimePoint(duration_cast<TargetDuration>(delta)); + } else { + // different clocks with different epochs, so non-optimal case + auto const delta = time - Clock::now(); + return TargetClock::now() + duration_cast<TargetDuration>(delta); + } +} + +/** + * Available overloads, with definitions elsewhere + * + * These functions are treated as ADL-extension points, the templates above + * call these functions without them having being pre-declared. This works + * because ADL lookup finds the definitions of these functions when you pass + * the relevant arguments + */ +int futexWakeImpl( + const Futex<std::atomic>* futex, + int count, + uint32_t wakeMask); +FutexResult futexWaitImpl( + const Futex<std::atomic>* futex, + uint32_t expected, + std::chrono::system_clock::time_point const* absSystemTime, + std::chrono::steady_clock::time_point const* absSteadyTime, + uint32_t waitMask); + +int futexWakeImpl( + const Futex<EmulatedFutexAtomic>* futex, + int count, + uint32_t wakeMask); +FutexResult futexWaitImpl( + const Futex<EmulatedFutexAtomic>* futex, + uint32_t expected, + std::chrono::system_clock::time_point const* absSystemTime, + std::chrono::steady_clock::time_point const* absSteadyTime, + uint32_t waitMask); + +template <typename Futex, typename Deadline> +typename std::enable_if<Deadline::clock::is_steady, FutexResult>::type +futexWaitImpl( + Futex* futex, + uint32_t expected, + Deadline const& deadline, + uint32_t waitMask) { + return futexWaitImpl(futex, expected, nullptr, &deadline, waitMask); +} + +template <typename Futex, typename Deadline> +typename std::enable_if<!Deadline::clock::is_steady, FutexResult>::type +futexWaitImpl( + Futex* futex, + uint32_t expected, + Deadline const& deadline, + uint32_t waitMask) { + return futexWaitImpl(futex, expected, &deadline, nullptr, waitMask); +} + +template <typename Futex> +FutexResult +futexWait(const Futex* futex, uint32_t expected, uint32_t waitMask) { + auto rv = futexWaitImpl(futex, expected, nullptr, nullptr, waitMask); + assert(rv != FutexResult::TIMEDOUT); + return rv; +} + +template <typename Futex> +int futexWake(const Futex* futex, int count, uint32_t wakeMask) { + return futexWakeImpl(futex, count, wakeMask); +} + +template <typename Futex, class Clock, class Duration> +FutexResult futexWaitUntil( + const Futex* futex, + uint32_t expected, + std::chrono::time_point<Clock, Duration> const& deadline, + uint32_t waitMask) { + using Target = typename std::conditional< + Clock::is_steady, + std::chrono::steady_clock, + std::chrono::system_clock>::type; + auto const converted = time_point_conv<Target>(deadline); + return converted == Target::time_point::max() + ? futexWaitImpl(futex, expected, nullptr, nullptr, waitMask) + : futexWaitImpl(futex, expected, converted, waitMask); +} + +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/detail/Futex.cpp b/src/rocksdb/third-party/folly/folly/detail/Futex.cpp new file mode 100644 index 000000000..62d6ea2b2 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/detail/Futex.cpp @@ -0,0 +1,263 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <folly/detail/Futex.h> +#include <folly/portability/SysSyscall.h> +#include <stdint.h> +#include <string.h> +#include <array> +#include <cerrno> + +#include <folly/synchronization/ParkingLot.h> + +#ifdef __linux__ +#include <linux/futex.h> +#endif + +#ifndef _WIN32 +#include <unistd.h> +#endif + +using namespace std::chrono; + +namespace folly { +namespace detail { + +namespace { + +//////////////////////////////////////////////////// +// native implementation using the futex() syscall + +#ifdef __linux__ + +/// Certain toolchains (like Android's) don't include the full futex API in +/// their headers even though they support it. Make sure we have our constants +/// even if the headers don't have them. +#ifndef FUTEX_WAIT_BITSET +#define FUTEX_WAIT_BITSET 9 +#endif +#ifndef FUTEX_WAKE_BITSET +#define FUTEX_WAKE_BITSET 10 +#endif +#ifndef FUTEX_PRIVATE_FLAG +#define FUTEX_PRIVATE_FLAG 128 +#endif +#ifndef FUTEX_CLOCK_REALTIME +#define FUTEX_CLOCK_REALTIME 256 +#endif + +int nativeFutexWake(const void* addr, int count, uint32_t wakeMask) { + long rv = syscall( + __NR_futex, + addr, /* addr1 */ + FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */ + count, /* val */ + nullptr, /* timeout */ + nullptr, /* addr2 */ + wakeMask); /* val3 */ + + /* NOTE: we ignore errors on wake for the case of a futex + guarding its own destruction, similar to this + glibc bug with sem_post/sem_wait: + https://sourceware.org/bugzilla/show_bug.cgi?id=12674 */ + if (rv < 0) { + return 0; + } + return static_cast<int>(rv); +} + +template <class Clock> +struct timespec timeSpecFromTimePoint(time_point<Clock> absTime) { + auto epoch = absTime.time_since_epoch(); + if (epoch.count() < 0) { + // kernel timespec_valid requires non-negative seconds and nanos in [0,1G) + epoch = Clock::duration::zero(); + } + + // timespec-safe seconds and nanoseconds; + // chrono::{nano,}seconds are `long long int` + // whereas timespec uses smaller types + using time_t_seconds = duration<std::time_t, seconds::period>; + using long_nanos = duration<long int, nanoseconds::period>; + + auto secs = duration_cast<time_t_seconds>(epoch); + auto nanos = duration_cast<long_nanos>(epoch - secs); + struct timespec result = {secs.count(), nanos.count()}; + return result; +} + +FutexResult nativeFutexWaitImpl( + const void* addr, + uint32_t expected, + system_clock::time_point const* absSystemTime, + steady_clock::time_point const* absSteadyTime, + uint32_t waitMask) { + assert(absSystemTime == nullptr || absSteadyTime == nullptr); + + int op = FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG; + struct timespec ts; + struct timespec* timeout = nullptr; + + if (absSystemTime != nullptr) { + op |= FUTEX_CLOCK_REALTIME; + ts = timeSpecFromTimePoint(*absSystemTime); + timeout = &ts; + } else if (absSteadyTime != nullptr) { + ts = timeSpecFromTimePoint(*absSteadyTime); + timeout = &ts; + } + + // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout + // value - http://locklessinc.com/articles/futex_cheat_sheet/ + long rv = syscall( + __NR_futex, + addr, /* addr1 */ + op, /* op */ + expected, /* val */ + timeout, /* timeout */ + nullptr, /* addr2 */ + waitMask); /* val3 */ + + if (rv == 0) { + return FutexResult::AWOKEN; + } else { + switch (errno) { + case ETIMEDOUT: + assert(timeout != nullptr); + return FutexResult::TIMEDOUT; + case EINTR: + return FutexResult::INTERRUPTED; + case EWOULDBLOCK: + return FutexResult::VALUE_CHANGED; + default: + assert(false); + // EINVAL, EACCESS, or EFAULT. EINVAL means there was an invalid + // op (should be impossible) or an invalid timeout (should have + // been sanitized by timeSpecFromTimePoint). EACCESS or EFAULT + // means *addr points to invalid memory, which is unlikely because + // the caller should have segfaulted already. We can either + // crash, or return a value that lets the process continue for + // a bit. We choose the latter. VALUE_CHANGED probably turns the + // caller into a spin lock. + return FutexResult::VALUE_CHANGED; + } + } +} + +#endif // __linux__ + +/////////////////////////////////////////////////////// +// compatibility implementation using standard C++ API + +using Lot = ParkingLot<uint32_t>; +Lot parkingLot; + +int emulatedFutexWake(const void* addr, int count, uint32_t waitMask) { + int woken = 0; + parkingLot.unpark(addr, [&](const uint32_t& mask) { + if ((mask & waitMask) == 0) { + return UnparkControl::RetainContinue; + } + assert(count > 0); + count--; + woken++; + return count > 0 ? UnparkControl::RemoveContinue + : UnparkControl::RemoveBreak; + }); + return woken; +} + +template <typename F> +FutexResult emulatedFutexWaitImpl( + F* futex, + uint32_t expected, + system_clock::time_point const* absSystemTime, + steady_clock::time_point const* absSteadyTime, + uint32_t waitMask) { + static_assert( + std::is_same<F, const Futex<std::atomic>>::value || + std::is_same<F, const Futex<EmulatedFutexAtomic>>::value, + "Type F must be either Futex<std::atomic> or Futex<EmulatedFutexAtomic>"); + ParkResult res; + if (absSystemTime) { + res = parkingLot.park_until( + futex, + waitMask, + [&] { return *futex == expected; }, + [] {}, + *absSystemTime); + } else if (absSteadyTime) { + res = parkingLot.park_until( + futex, + waitMask, + [&] { return *futex == expected; }, + [] {}, + *absSteadyTime); + } else { + res = parkingLot.park( + futex, waitMask, [&] { return *futex == expected; }, [] {}); + } + switch (res) { + case ParkResult::Skip: + return FutexResult::VALUE_CHANGED; + case ParkResult::Unpark: + return FutexResult::AWOKEN; + case ParkResult::Timeout: + return FutexResult::TIMEDOUT; + } + + return FutexResult::INTERRUPTED; +} + +} // namespace + +///////////////////////////////// +// Futex<> overloads + +int futexWakeImpl( + const Futex<std::atomic>* futex, + int count, + uint32_t wakeMask) { +#ifdef __linux__ + return nativeFutexWake(futex, count, wakeMask); +#else + return emulatedFutexWake(futex, count, wakeMask); +#endif +} + +int futexWakeImpl( + const Futex<EmulatedFutexAtomic>* futex, + int count, + uint32_t wakeMask) { + return emulatedFutexWake(futex, count, wakeMask); +} + +FutexResult futexWaitImpl( + const Futex<std::atomic>* futex, + uint32_t expected, + system_clock::time_point const* absSystemTime, + steady_clock::time_point const* absSteadyTime, + uint32_t waitMask) { +#ifdef __linux__ + return nativeFutexWaitImpl( + futex, expected, absSystemTime, absSteadyTime, waitMask); +#else + return emulatedFutexWaitImpl( + futex, expected, absSystemTime, absSteadyTime, waitMask); +#endif +} + +FutexResult futexWaitImpl( + const Futex<EmulatedFutexAtomic>* futex, + uint32_t expected, + system_clock::time_point const* absSystemTime, + steady_clock::time_point const* absSteadyTime, + uint32_t waitMask) { + return emulatedFutexWaitImpl( + futex, expected, absSystemTime, absSteadyTime, waitMask); +} + +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/detail/Futex.h b/src/rocksdb/third-party/folly/folly/detail/Futex.h new file mode 100644 index 000000000..987a1b895 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/detail/Futex.h @@ -0,0 +1,96 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <cassert> +#include <chrono> +#include <cstdint> +#include <limits> +#include <type_traits> + +namespace folly { +namespace detail { + +enum class FutexResult { + VALUE_CHANGED, /* futex value didn't match expected */ + AWOKEN, /* wakeup by matching futex wake, or spurious wakeup */ + INTERRUPTED, /* wakeup by interrupting signal */ + TIMEDOUT, /* wakeup by expiring deadline */ +}; + +/** + * Futex is an atomic 32 bit unsigned integer that provides access to the + * futex() syscall on that value. It is templated in such a way that it + * can interact properly with DeterministicSchedule testing. + * + * If you don't know how to use futex(), you probably shouldn't be using + * this class. Even if you do know how, you should have a good reason + * (and benchmarks to back you up). + * + * Because of the semantics of the futex syscall, the futex family of + * functions are available as free functions rather than member functions + */ +template <template <typename> class Atom = std::atomic> +using Futex = Atom<std::uint32_t>; + +/** + * Puts the thread to sleep if this->load() == expected. Returns true when + * it is returning because it has consumed a wake() event, false for any + * other return (signal, this->load() != expected, or spurious wakeup). + */ +template <typename Futex> +FutexResult +futexWait(const Futex* futex, uint32_t expected, uint32_t waitMask = -1); + +/** + * Similar to futexWait but also accepts a deadline until when the wait call + * may block. + * + * Optimal clock types: std::chrono::system_clock, std::chrono::steady_clock. + * NOTE: On some systems steady_clock is just an alias for system_clock, + * and is not actually steady. + * + * For any other clock type, now() will be invoked twice. + */ +template <typename Futex, class Clock, class Duration> +FutexResult futexWaitUntil( + const Futex* futex, + uint32_t expected, + std::chrono::time_point<Clock, Duration> const& deadline, + uint32_t waitMask = -1); + +/** + * Wakes up to count waiters where (waitMask & wakeMask) != 0, returning the + * number of awoken threads, or -1 if an error occurred. Note that when + * constructing a concurrency primitive that can guard its own destruction, it + * is likely that you will want to ignore EINVAL here (as well as making sure + * that you never touch the object after performing the memory store that is + * the linearization point for unlock or control handoff). See + * https://sourceware.org/bugzilla/show_bug.cgi?id=13690 + */ +template <typename Futex> +int futexWake( + const Futex* futex, + int count = std::numeric_limits<int>::max(), + uint32_t wakeMask = -1); + +/** A std::atomic subclass that can be used to force Futex to emulate + * the underlying futex() syscall. This is primarily useful to test or + * benchmark the emulated implementation on systems that don't need it. */ +template <typename T> +struct EmulatedFutexAtomic : public std::atomic<T> { + EmulatedFutexAtomic() noexcept = default; + constexpr /* implicit */ EmulatedFutexAtomic(T init) noexcept + : std::atomic<T>(init) {} + // It doesn't copy or move + EmulatedFutexAtomic(EmulatedFutexAtomic&& rhs) = delete; +}; + +} // namespace detail +} // namespace folly + +#include <folly/detail/Futex-inl.h> diff --git a/src/rocksdb/third-party/folly/folly/functional/Invoke.h b/src/rocksdb/third-party/folly/folly/functional/Invoke.h new file mode 100644 index 000000000..67c552843 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/functional/Invoke.h @@ -0,0 +1,40 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Traits.h> + +#include <functional> +#include <type_traits> + +namespace folly { +namespace invoke_detail { +template <typename F, typename... Args> +using invoke_result_ = decltype(std::declval<F>()(std::declval<Args>()...)); + +template <typename Void, typename F, typename... Args> +struct is_invocable : std::false_type {}; + +template <typename F, typename... Args> +struct is_invocable<void_t<invoke_result_<F, Args...>>, F, Args...> + : std::true_type {}; + +template <typename Void, typename R, typename F, typename... Args> +struct is_invocable_r : std::false_type {}; + +template <typename R, typename F, typename... Args> +struct is_invocable_r<void_t<invoke_result_<F, Args...>>, R, F, Args...> + : std::is_convertible<invoke_result_<F, Args...>, R> {}; +} // namespace invoke_detail + +// mimic: std::is_invocable, C++17 +template <typename F, typename... Args> +struct is_invocable : invoke_detail::is_invocable<void, F, Args...> {}; + +// mimic: std::is_invocable_r, C++17 +template <typename R, typename F, typename... Args> +struct is_invocable_r : invoke_detail::is_invocable_r<void, R, F, Args...> {}; +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/hash/Hash.h b/src/rocksdb/third-party/folly/folly/hash/Hash.h new file mode 100644 index 000000000..ca221e5c0 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/hash/Hash.h @@ -0,0 +1,29 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <cstdint> + +namespace folly { +namespace hash { + +/* + * Thomas Wang 64 bit mix hash function + */ + +inline uint64_t twang_mix64(uint64_t key) noexcept { + key = (~key) + (key << 21); // key *= (1 << 21) - 1; key -= 1; + key = key ^ (key >> 24); + key = key + (key << 3) + (key << 8); // key *= 1 + (1 << 3) + (1 << 8) + key = key ^ (key >> 14); + key = key + (key << 2) + (key << 4); // key *= 1 + (1 << 2) + (1 << 4) + key = key ^ (key >> 28); + key = key + (key << 31); // key *= 1 + (1 << 31) + return key; +} + +} // namespace hash +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/lang/Align.h b/src/rocksdb/third-party/folly/folly/lang/Align.h new file mode 100644 index 000000000..5257e2f6f --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/lang/Align.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <cstdint> + +namespace folly { + +// Memory locations within the same cache line are subject to destructive +// interference, also known as false sharing, which is when concurrent +// accesses to these different memory locations from different cores, where at +// least one of the concurrent accesses is or involves a store operation, +// induce contention and harm performance. +// +// Microbenchmarks indicate that pairs of cache lines also see destructive +// interference under heavy use of atomic operations, as observed for atomic +// increment on Sandy Bridge. +// +// We assume a cache line size of 64, so we use a cache line pair size of 128 +// to avoid destructive interference. +// +// mimic: std::hardware_destructive_interference_size, C++17 +constexpr std::size_t hardware_destructive_interference_size = 128; + +// Memory locations within the same cache line are subject to constructive +// interference, also known as true sharing, which is when accesses to some +// memory locations induce all memory locations within the same cache line to +// be cached, benefiting subsequent accesses to different memory locations +// within the same cache line and heping performance. +// +// mimic: std::hardware_constructive_interference_size, C++17 +constexpr std::size_t hardware_constructive_interference_size = 64; + +} // namespace folly + diff --git a/src/rocksdb/third-party/folly/folly/lang/Bits.h b/src/rocksdb/third-party/folly/folly/lang/Bits.h new file mode 100644 index 000000000..f3abeffc4 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/lang/Bits.h @@ -0,0 +1,30 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Traits.h> + +#include <cstdint> +#include <cstring> +#include <type_traits> + +namespace folly { + +template < + typename To, + typename From, + _t<std::enable_if< + sizeof(From) == sizeof(To) && std::is_trivial<To>::value && + is_trivially_copyable<From>::value, + int>> = 0> +To bit_cast(const From& src) noexcept { + To to; + std::memcpy(&to, &src, sizeof(From)); + return to; +} + +} // namespace folly + diff --git a/src/rocksdb/third-party/folly/folly/lang/Launder.h b/src/rocksdb/third-party/folly/folly/lang/Launder.h new file mode 100644 index 000000000..9247e3e33 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/lang/Launder.h @@ -0,0 +1,51 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <new> + +#include <folly/Portability.h> + +/*** + * include or backport: + * * std::launder + */ + +namespace folly { + +/** + * Approximate backport from C++17 of std::launder. It should be `constexpr` + * but that can't be done without specific support from the compiler. + */ +template <typename T> +FOLLY_NODISCARD inline T* launder(T* in) noexcept { +#if FOLLY_HAS_BUILTIN(__builtin_launder) || __GNUC__ >= 7 + // The builtin has no unwanted side-effects. + return __builtin_launder(in); +#elif __GNUC__ + // This inline assembler block declares that `in` is an input and an output, + // so the compiler has to assume that it has been changed inside the block. + __asm__("" : "+r"(in)); + return in; +#elif defined(_WIN32) + // MSVC does not currently have optimizations around const members of structs. + // _ReadWriteBarrier() will prevent compiler reordering memory accesses. + _ReadWriteBarrier(); + return in; +#else + static_assert( + false, "folly::launder is not implemented for this environment"); +#endif +} + +/* The standard explicitly forbids laundering these */ +void launder(void*) = delete; +void launder(void const*) = delete; +void launder(void volatile*) = delete; +void launder(void const volatile*) = delete; +template <typename T, typename... Args> +void launder(T (*)(Args...)) = delete; +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/portability/Asm.h b/src/rocksdb/third-party/folly/folly/portability/Asm.h new file mode 100644 index 000000000..cca168586 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/portability/Asm.h @@ -0,0 +1,28 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Portability.h> + +#include <cstdint> + +#ifdef _MSC_VER +#include <intrin.h> +#endif + +namespace folly { +inline void asm_volatile_pause() { +#if defined(_MSC_VER) && (defined(_M_IX86) || defined(_M_X64)) + ::_mm_pause(); +#elif defined(__i386__) || FOLLY_X64 + asm volatile("pause"); +#elif FOLLY_AARCH64 || defined(__arm__) + asm volatile("yield"); +#elif FOLLY_PPC64 + asm volatile("or 27,27,27"); +#endif +} +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/portability/SysSyscall.h b/src/rocksdb/third-party/folly/folly/portability/SysSyscall.h new file mode 100644 index 000000000..fa969deb1 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/portability/SysSyscall.h @@ -0,0 +1,10 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef _WIN32 +#include <sys/syscall.h> +#endif diff --git a/src/rocksdb/third-party/folly/folly/portability/SysTypes.h b/src/rocksdb/third-party/folly/folly/portability/SysTypes.h new file mode 100644 index 000000000..7beb68cfb --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/portability/SysTypes.h @@ -0,0 +1,26 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <sys/types.h> + +#ifdef _WIN32 +#include <basetsd.h> // @manual + +#define HAVE_MODE_T 1 + +// This is a massive pain to have be an `int` due to the pthread implementation +// we support, but it's far more compatible with the rest of the windows world +// as an `int` than it would be as a `void*` +using pid_t = int; +// This isn't actually supposed to be defined here, but it's the most +// appropriate place without defining a portability header for stdint.h +// with just this single typedef. +using ssize_t = SSIZE_T; +// The Windows headers don't define this anywhere, nor do any of the libs +// that Folly depends on, so define it here. +using mode_t = unsigned short; +#endif diff --git a/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification-inl.h b/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification-inl.h new file mode 100644 index 000000000..c0b143d0a --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification-inl.h @@ -0,0 +1,138 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/detail/Futex.h> +#include <folly/synchronization/ParkingLot.h> + +#include <condition_variable> +#include <cstdint> + +namespace folly { +namespace detail { +namespace atomic_notification { +/** + * We use Futex<std::atomic> as the alias that has the lowest performance + * overhead with respect to atomic notifications. Assert that + * atomic_uint_fast_wait_t is the same as Futex<std::atomic> + */ +static_assert(std::is_same<atomic_uint_fast_wait_t, Futex<std::atomic>>{}, ""); + +/** + * Implementation and specializations for the atomic_wait() family of + * functions + */ +inline std::cv_status toCvStatus(FutexResult result) { + return (result == FutexResult::TIMEDOUT) ? std::cv_status::timeout + : std::cv_status::no_timeout; +} +inline std::cv_status toCvStatus(ParkResult result) { + return (result == ParkResult::Timeout) ? std::cv_status::timeout + : std::cv_status::no_timeout; +} + +// ParkingLot instantiation for futex management +extern ParkingLot<std::uint32_t> parkingLot; + +template <template <typename...> class Atom, typename... Args> +void atomic_wait_impl( + const Atom<std::uint32_t, Args...>* atomic, + std::uint32_t expected) { + futexWait(atomic, expected); + return; +} + +template <template <typename...> class Atom, typename Integer, typename... Args> +void atomic_wait_impl(const Atom<Integer, Args...>* atomic, Integer expected) { + static_assert(!std::is_same<Integer, std::uint32_t>{}, ""); + parkingLot.park( + atomic, -1, [&] { return atomic->load() == expected; }, [] {}); +} + +template < + template <typename...> class Atom, + typename... Args, + typename Clock, + typename Duration> +std::cv_status atomic_wait_until_impl( + const Atom<std::uint32_t, Args...>* atomic, + std::uint32_t expected, + const std::chrono::time_point<Clock, Duration>& deadline) { + return toCvStatus(futexWaitUntil(atomic, expected, deadline)); +} + +template < + template <typename...> class Atom, + typename Integer, + typename... Args, + typename Clock, + typename Duration> +std::cv_status atomic_wait_until_impl( + const Atom<Integer, Args...>* atomic, + Integer expected, + const std::chrono::time_point<Clock, Duration>& deadline) { + static_assert(!std::is_same<Integer, std::uint32_t>{}, ""); + return toCvStatus(parkingLot.park_until( + atomic, -1, [&] { return atomic->load() == expected; }, [] {}, deadline)); +} + +template <template <typename...> class Atom, typename... Args> +void atomic_notify_one_impl(const Atom<std::uint32_t, Args...>* atomic) { + futexWake(atomic, 1); + return; +} + +template <template <typename...> class Atom, typename Integer, typename... Args> +void atomic_notify_one_impl(const Atom<Integer, Args...>* atomic) { + static_assert(!std::is_same<Integer, std::uint32_t>{}, ""); + parkingLot.unpark(atomic, [&](std::uint32_t data) { + assert(data == std::numeric_limits<std::uint32_t>::max()); + return UnparkControl::RemoveBreak; + }); +} + +template <template <typename...> class Atom, typename... Args> +void atomic_notify_all_impl(const Atom<std::uint32_t, Args...>* atomic) { + futexWake(atomic); + return; +} + +template <template <typename...> class Atom, typename Integer, typename... Args> +void atomic_notify_all_impl(const Atom<Integer, Args...>* atomic) { + static_assert(!std::is_same<Integer, std::uint32_t>{}, ""); + parkingLot.unpark(atomic, [&](std::uint32_t data) { + assert(data == std::numeric_limits<std::uint32_t>::max()); + return UnparkControl::RemoveContinue; + }); +} +} // namespace atomic_notification +} // namespace detail + +template <typename Integer> +void atomic_wait(const std::atomic<Integer>* atomic, Integer expected) { + detail::atomic_notification::atomic_wait_impl(atomic, expected); +} + +template <typename Integer, typename Clock, typename Duration> +std::cv_status atomic_wait_until( + const std::atomic<Integer>* atomic, + Integer expected, + const std::chrono::time_point<Clock, Duration>& deadline) { + return detail::atomic_notification::atomic_wait_until_impl( + atomic, expected, deadline); +} + +template <typename Integer> +void atomic_notify_one(const std::atomic<Integer>* atomic) { + detail::atomic_notification::atomic_notify_one_impl(atomic); +} + +template <typename Integer> +void atomic_notify_all(const std::atomic<Integer>* atomic) { + detail::atomic_notification::atomic_notify_all_impl(atomic); +} + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification.cpp b/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification.cpp new file mode 100644 index 000000000..b50875cd5 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification.cpp @@ -0,0 +1,23 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <folly/synchronization/AtomicNotification.h> + +#include <cstdint> + +namespace folly { +namespace detail { +namespace atomic_notification { + +// ParkingLot instance used for the atomic_wait() family of functions +// +// This has been defined as a static object (as opposed to allocated to avoid +// destruction order problems) because of possible uses coming from +// allocation-sensitive contexts. +ParkingLot<std::uint32_t> parkingLot; + +} // namespace atomic_notification +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification.h b/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification.h new file mode 100644 index 000000000..af87852cb --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/AtomicNotification.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <condition_variable> + +namespace folly { + +/** + * The behavior of the atomic_wait() family of functions is semantically + * identical to futex(). Correspondingly, calling atomic_notify_one(), + * atomic_notify_all() is identical to futexWake() with 1 and + * std::numeric_limits<int>::max() respectively + * + * The difference here compared to the futex API above is that it works with + * all types of atomic widths. When a 32 bit atomic integer is used, the + * implementation falls back to using futex() if possible, and the + * compatibility implementation for non-linux systems otherwise. For all + * other integer widths, the compatibility implementation is used + * + * The templating of this API is changed from the standard in the following + * ways + * + * - At the time of writing, libstdc++'s implementation of std::atomic<> does + * not include the value_type alias. So we rely on the atomic type being a + * template class such that the first type is the underlying value type + * - The Atom parameter allows this API to be compatible with + * DeterministicSchedule testing. + * - atomic_wait_until() does not exist in the linked paper, the version here + * is identical to futexWaitUntil() and returns std::cv_status + */ +// mimic: std::atomic_wait, p1135r0 +template <typename Integer> +void atomic_wait(const std::atomic<Integer>* atomic, Integer expected); +template <typename Integer, typename Clock, typename Duration> +std::cv_status atomic_wait_until( + const std::atomic<Integer>* atomic, + Integer expected, + const std::chrono::time_point<Clock, Duration>& deadline); + +// mimic: std::atomic_notify_one, p1135r0 +template <typename Integer> +void atomic_notify_one(const std::atomic<Integer>* atomic); +// mimic: std::atomic_notify_all, p1135r0 +template <typename Integer> +void atomic_notify_all(const std::atomic<Integer>* atomic); + +// mimic: std::atomic_uint_fast_wait_t, p1135r0 +using atomic_uint_fast_wait_t = std::atomic<std::uint32_t>; + +} // namespace folly + +#include <folly/synchronization/AtomicNotification-inl.h> diff --git a/src/rocksdb/third-party/folly/folly/synchronization/AtomicUtil-inl.h b/src/rocksdb/third-party/folly/folly/synchronization/AtomicUtil-inl.h new file mode 100644 index 000000000..4c10d8451 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/AtomicUtil-inl.h @@ -0,0 +1,260 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Portability.h> + +#include <atomic> +#include <cassert> +#include <cstdint> +#include <tuple> +#include <type_traits> + +#if _WIN32 +#include <intrin.h> +#endif + +namespace folly { +namespace detail { + +// TODO: Remove the non-default implementations when both gcc and clang +// can recognize single bit set/reset patterns and compile them down to locked +// bts and btr instructions. +// +// Currently, at the time of writing it seems like gcc7 and greater can make +// this optimization and clang cannot - https://gcc.godbolt.org/z/Q83rxX + +template <typename Atomic> +bool atomic_fetch_set_default( + Atomic& atomic, + std::size_t bit, + std::memory_order order) { + using Integer = decltype(atomic.load()); + auto mask = Integer{0b1} << static_cast<Integer>(bit); + return (atomic.fetch_or(mask, order) & mask); +} + +template <typename Atomic> +bool atomic_fetch_reset_default( + Atomic& atomic, + std::size_t bit, + std::memory_order order) { + using Integer = decltype(atomic.load()); + auto mask = Integer{0b1} << static_cast<Integer>(bit); + return (atomic.fetch_and(~mask, order) & mask); +} + +/** + * A simple trait to determine if the given type is an instantiation of + * std::atomic + */ +template <typename T> +struct is_atomic : std::false_type {}; +template <typename Integer> +struct is_atomic<std::atomic<Integer>> : std::true_type {}; + +#if FOLLY_X64 + +#if _MSC_VER + +template <typename Integer> +inline bool atomic_fetch_set_x86( + std::atomic<Integer>& atomic, + std::size_t bit, + std::memory_order order) { + static_assert(alignof(std::atomic<Integer>) == alignof(Integer), ""); + static_assert(sizeof(std::atomic<Integer>) == sizeof(Integer), ""); + assert(atomic.is_lock_free()); + + if /* constexpr */ (sizeof(Integer) == 4) { + return _interlockedbittestandset( + reinterpret_cast<volatile long*>(&atomic), static_cast<long>(bit)); + } else if /* constexpr */ (sizeof(Integer) == 8) { + return _interlockedbittestandset64( + reinterpret_cast<volatile long long*>(&atomic), + static_cast<long long>(bit)); + } else { + assert(sizeof(Integer) != 4 && sizeof(Integer) != 8); + return atomic_fetch_set_default(atomic, bit, order); + } +} + +template <typename Atomic> +inline bool +atomic_fetch_set_x86(Atomic& atomic, std::size_t bit, std::memory_order order) { + static_assert(!std::is_same<Atomic, std::atomic<std::uint32_t>>{}, ""); + static_assert(!std::is_same<Atomic, std::atomic<std::uint64_t>>{}, ""); + return atomic_fetch_set_default(atomic, bit, order); +} + +template <typename Integer> +inline bool atomic_fetch_reset_x86( + std::atomic<Integer>& atomic, + std::size_t bit, + std::memory_order order) { + static_assert(alignof(std::atomic<Integer>) == alignof(Integer), ""); + static_assert(sizeof(std::atomic<Integer>) == sizeof(Integer), ""); + assert(atomic.is_lock_free()); + + if /* constexpr */ (sizeof(Integer) == 4) { + return _interlockedbittestandreset( + reinterpret_cast<volatile long*>(&atomic), static_cast<long>(bit)); + } else if /* constexpr */ (sizeof(Integer) == 8) { + return _interlockedbittestandreset64( + reinterpret_cast<volatile long long*>(&atomic), + static_cast<long long>(bit)); + } else { + assert(sizeof(Integer) != 4 && sizeof(Integer) != 8); + return atomic_fetch_reset_default(atomic, bit, order); + } +} + +template <typename Atomic> +inline bool +atomic_fetch_reset_x86(Atomic& atomic, std::size_t bit, std::memory_order mo) { + static_assert(!std::is_same<Atomic, std::atomic<std::uint32_t>>{}, ""); + static_assert(!std::is_same<Atomic, std::atomic<std::uint64_t>>{}, ""); + return atomic_fetch_reset_default(atomic, bit, mo); +} + +#else + +template <typename Integer> +inline bool atomic_fetch_set_x86( + std::atomic<Integer>& atomic, + std::size_t bit, + std::memory_order order) { + auto previous = false; + + if /* constexpr */ (sizeof(Integer) == 2) { + auto pointer = reinterpret_cast<std::uint16_t*>(&atomic); + asm volatile("lock; btsw %1, (%2); setc %0" + : "=r"(previous) + : "ri"(static_cast<std::uint16_t>(bit)), "r"(pointer) + : "memory", "flags"); + } else if /* constexpr */ (sizeof(Integer) == 4) { + auto pointer = reinterpret_cast<std::uint32_t*>(&atomic); + asm volatile("lock; btsl %1, (%2); setc %0" + : "=r"(previous) + : "ri"(static_cast<std::uint32_t>(bit)), "r"(pointer) + : "memory", "flags"); + } else if /* constexpr */ (sizeof(Integer) == 8) { + auto pointer = reinterpret_cast<std::uint64_t*>(&atomic); + asm volatile("lock; btsq %1, (%2); setc %0" + : "=r"(previous) + : "ri"(static_cast<std::uint64_t>(bit)), "r"(pointer) + : "memory", "flags"); + } else { + assert(sizeof(Integer) == 1); + return atomic_fetch_set_default(atomic, bit, order); + } + + return previous; +} + +template <typename Atomic> +inline bool +atomic_fetch_set_x86(Atomic& atomic, std::size_t bit, std::memory_order order) { + static_assert(!is_atomic<Atomic>::value, ""); + return atomic_fetch_set_default(atomic, bit, order); +} + +template <typename Integer> +inline bool atomic_fetch_reset_x86( + std::atomic<Integer>& atomic, + std::size_t bit, + std::memory_order order) { + auto previous = false; + + if /* constexpr */ (sizeof(Integer) == 2) { + auto pointer = reinterpret_cast<std::uint16_t*>(&atomic); + asm volatile("lock; btrw %1, (%2); setc %0" + : "=r"(previous) + : "ri"(static_cast<std::uint16_t>(bit)), "r"(pointer) + : "memory", "flags"); + } else if /* constexpr */ (sizeof(Integer) == 4) { + auto pointer = reinterpret_cast<std::uint32_t*>(&atomic); + asm volatile("lock; btrl %1, (%2); setc %0" + : "=r"(previous) + : "ri"(static_cast<std::uint32_t>(bit)), "r"(pointer) + : "memory", "flags"); + } else if /* constexpr */ (sizeof(Integer) == 8) { + auto pointer = reinterpret_cast<std::uint64_t*>(&atomic); + asm volatile("lock; btrq %1, (%2); setc %0" + : "=r"(previous) + : "ri"(static_cast<std::uint64_t>(bit)), "r"(pointer) + : "memory", "flags"); + } else { + assert(sizeof(Integer) == 1); + return atomic_fetch_reset_default(atomic, bit, order); + } + + return previous; +} + +template <typename Atomic> +bool atomic_fetch_reset_x86( + Atomic& atomic, + std::size_t bit, + std::memory_order order) { + static_assert(!is_atomic<Atomic>::value, ""); + return atomic_fetch_reset_default(atomic, bit, order); +} + +#endif + +#else + +template <typename Atomic> +bool atomic_fetch_set_x86(Atomic&, std::size_t, std::memory_order) noexcept { + // This should never be called on non x86_64 platforms. + std::terminate(); +} +template <typename Atomic> +bool atomic_fetch_reset_x86(Atomic&, std::size_t, std::memory_order) noexcept { + // This should never be called on non x86_64 platforms. + std::terminate(); +} + +#endif + +} // namespace detail + +template <typename Atomic> +bool atomic_fetch_set(Atomic& atomic, std::size_t bit, std::memory_order mo) { + using Integer = decltype(atomic.load()); + static_assert(std::is_unsigned<Integer>{}, ""); + static_assert(!std::is_const<Atomic>{}, ""); + assert(bit < (sizeof(Integer) * 8)); + + // do the optimized thing on x86 builds. Also, some versions of TSAN do not + // properly instrument the inline assembly, so avoid it when TSAN is enabled + if (folly::kIsArchAmd64 && !folly::kIsSanitizeThread) { + return detail::atomic_fetch_set_x86(atomic, bit, mo); + } else { + // otherwise default to the default implementation using fetch_or() + return detail::atomic_fetch_set_default(atomic, bit, mo); + } +} + +template <typename Atomic> +bool atomic_fetch_reset(Atomic& atomic, std::size_t bit, std::memory_order mo) { + using Integer = decltype(atomic.load()); + static_assert(std::is_unsigned<Integer>{}, ""); + static_assert(!std::is_const<Atomic>{}, ""); + assert(bit < (sizeof(Integer) * 8)); + + // do the optimized thing on x86 builds. Also, some versions of TSAN do not + // properly instrument the inline assembly, so avoid it when TSAN is enabled + if (folly::kIsArchAmd64 && !folly::kIsSanitizeThread) { + return detail::atomic_fetch_reset_x86(atomic, bit, mo); + } else { + // otherwise default to the default implementation using fetch_and() + return detail::atomic_fetch_reset_default(atomic, bit, mo); + } +} + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/AtomicUtil.h b/src/rocksdb/third-party/folly/folly/synchronization/AtomicUtil.h new file mode 100644 index 000000000..95bcf73c5 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/AtomicUtil.h @@ -0,0 +1,52 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <cstdint> + +namespace folly { + +/** + * Sets a bit at the given index in the binary representation of the integer + * to 1. Returns the previous value of the bit, so true if the bit was not + * changed, false otherwise + * + * On some architectures, using this is more efficient than the corresponding + * std::atomic::fetch_or() with a mask. For example to set the first (least + * significant) bit of an integer, you could do atomic.fetch_or(0b1) + * + * The efficiency win is only visible in x86 (yet) and comes from the + * implementation using the x86 bts instruction when possible. + * + * When something other than std::atomic is passed, the implementation assumed + * incompatibility with this interface and calls Atomic::fetch_or() + */ +template <typename Atomic> +bool atomic_fetch_set( + Atomic& atomic, + std::size_t bit, + std::memory_order order = std::memory_order_seq_cst); + +/** + * Resets a bit at the given index in the binary representation of the integer + * to 0. Returns the previous value of the bit, so true if the bit was + * changed, false otherwise + * + * This follows the same underlying principle and implementation as + * fetch_set(). Using the optimized implementation when possible and falling + * back to std::atomic::fetch_and() when in debug mode or in an architecture + * where an optimization is not possible + */ +template <typename Atomic> +bool atomic_fetch_reset( + Atomic& atomic, + std::size_t bit, + std::memory_order order = std::memory_order_seq_cst); + +} // namespace folly + +#include <folly/synchronization/AtomicUtil-inl.h> diff --git a/src/rocksdb/third-party/folly/folly/synchronization/Baton.h b/src/rocksdb/third-party/folly/folly/synchronization/Baton.h new file mode 100644 index 000000000..6a6403def --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/Baton.h @@ -0,0 +1,327 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <assert.h> +#include <errno.h> +#include <stdint.h> +#include <atomic> +#include <thread> + +#include <folly/detail/Futex.h> +#include <folly/portability/Asm.h> +#include <folly/synchronization/WaitOptions.h> +#include <folly/synchronization/detail/Spin.h> + +namespace folly { + +/// A Baton allows a thread to block once and be awoken. Captures a +/// single handoff, and during its lifecycle (from construction/reset +/// to destruction/reset) a baton must either be post()ed and wait()ed +/// exactly once each, or not at all. +/// +/// Baton includes no internal padding, and is only 4 bytes in size. +/// Any alignment or padding to avoid false sharing is up to the user. +/// +/// This is basically a stripped-down semaphore that supports only a +/// single call to sem_post and a single call to sem_wait. +/// +/// The non-blocking version (MayBlock == false) provides more speed +/// by using only load acquire and store release operations in the +/// critical path, at the cost of disallowing blocking. +/// +/// The current posix semaphore sem_t isn't too bad, but this provides +/// more a bit more speed, inlining, smaller size, a guarantee that +/// the implementation won't change, and compatibility with +/// DeterministicSchedule. By having a much more restrictive +/// lifecycle we can also add a bunch of assertions that can help to +/// catch race conditions ahead of time. +template <bool MayBlock = true, template <typename> class Atom = std::atomic> +class Baton { + public: + static constexpr WaitOptions wait_options() { + return {}; + } + + constexpr Baton() noexcept : state_(INIT) {} + + Baton(Baton const&) = delete; + Baton& operator=(Baton const&) = delete; + + /// It is an error to destroy a Baton on which a thread is currently + /// wait()ing. In practice this means that the waiter usually takes + /// responsibility for destroying the Baton. + ~Baton() noexcept { + // The docblock for this function says that it can't be called when + // there is a concurrent waiter. We assume a strong version of this + // requirement in which the caller must _know_ that this is true, they + // are not allowed to be merely lucky. If two threads are involved, + // the destroying thread must actually have synchronized with the + // waiting thread after wait() returned. To convey causality the the + // waiting thread must have used release semantics and the destroying + // thread must have used acquire semantics for that communication, + // so we are guaranteed to see the post-wait() value of state_, + // which cannot be WAITING. + // + // Note that since we only care about a single memory location, + // the only two plausible memory orders here are relaxed and seq_cst. + assert(state_.load(std::memory_order_relaxed) != WAITING); + } + + bool ready() const noexcept { + auto s = state_.load(std::memory_order_acquire); + assert(s == INIT || s == EARLY_DELIVERY); + return (s == EARLY_DELIVERY); + } + + /// Equivalent to destroying the Baton and creating a new one. It is + /// a bug to call this while there is a waiting thread, so in practice + /// the waiter will be the one that resets the baton. + void reset() noexcept { + // See ~Baton for a discussion about why relaxed is okay here + assert(state_.load(std::memory_order_relaxed) != WAITING); + + // We use a similar argument to justify the use of a relaxed store + // here. Since both wait() and post() are required to be called + // only once per lifetime, no thread can actually call those methods + // correctly after a reset() unless it synchronizes with the thread + // that performed the reset(). If a post() or wait() on another thread + // didn't synchronize, then regardless of what operation we performed + // here there would be a race on proper use of the Baton's spec + // (although not on any particular load and store). Put another way, + // we don't need to synchronize here because anybody that might rely + // on such synchronization is required by the baton rules to perform + // an additional synchronization that has the desired effect anyway. + // + // There is actually a similar argument to be made about the + // constructor, in which the fenceless constructor initialization + // of state_ is piggybacked on whatever synchronization mechanism + // distributes knowledge of the Baton's existence + state_.store(INIT, std::memory_order_relaxed); + } + + /// Causes wait() to wake up. For each lifetime of a Baton (where a + /// lifetime starts at construction or reset() and ends at + /// destruction or reset()) there can be at most one call to post(), + /// in the single poster version. Any thread may call post(). + void post() noexcept { + if (!MayBlock) { + /// Spin-only version + /// + assert( + ((1 << state_.load(std::memory_order_relaxed)) & + ((1 << INIT) | (1 << EARLY_DELIVERY))) != 0); + state_.store(EARLY_DELIVERY, std::memory_order_release); + return; + } + + /// May-block versions + /// + uint32_t before = state_.load(std::memory_order_acquire); + + assert(before == INIT || before == WAITING || before == TIMED_OUT); + + if (before == INIT && + state_.compare_exchange_strong( + before, + EARLY_DELIVERY, + std::memory_order_release, + std::memory_order_relaxed)) { + return; + } + + assert(before == WAITING || before == TIMED_OUT); + + if (before == TIMED_OUT) { + return; + } + + assert(before == WAITING); + state_.store(LATE_DELIVERY, std::memory_order_release); + detail::futexWake(&state_, 1); + } + + /// Waits until post() has been called in the current Baton lifetime. + /// May be called at most once during a Baton lifetime (construction + /// |reset until destruction|reset). If post is called before wait in + /// the current lifetime then this method returns immediately. + /// + /// The restriction that there can be at most one wait() per lifetime + /// could be relaxed somewhat without any perf or size regressions, + /// but by making this condition very restrictive we can provide better + /// checking in debug builds. + void wait(const WaitOptions& opt = wait_options()) noexcept { + if (try_wait()) { + return; + } + + auto const deadline = std::chrono::steady_clock::time_point::max(); + tryWaitSlow(deadline, opt); + } + + /// Similar to wait, but doesn't block the thread if it hasn't been posted. + /// + /// try_wait has the following semantics: + /// - It is ok to call try_wait any number times on the same baton until + /// try_wait reports that the baton has been posted. + /// - It is ok to call timed_wait or wait on the same baton if try_wait + /// reports that baton hasn't been posted. + /// - If try_wait indicates that the baton has been posted, it is invalid to + /// call wait, try_wait or timed_wait on the same baton without resetting + /// + /// @return true if baton has been posted, false othewise + bool try_wait() const noexcept { + return ready(); + } + + /// Similar to wait, but with a timeout. The thread is unblocked if the + /// timeout expires. + /// Note: Only a single call to wait/try_wait_for/try_wait_until is allowed + /// during a baton's life-cycle (from ctor/reset to dtor/reset). In other + /// words, after try_wait_for the caller can't invoke + /// wait/try_wait/try_wait_for/try_wait_until + /// again on the same baton without resetting it. + /// + /// @param timeout Time until which the thread can block + /// @return true if the baton was posted to before timeout, + /// false otherwise + template <typename Rep, typename Period> + bool try_wait_for( + const std::chrono::duration<Rep, Period>& timeout, + const WaitOptions& opt = wait_options()) noexcept { + if (try_wait()) { + return true; + } + + auto const deadline = std::chrono::steady_clock::now() + timeout; + return tryWaitSlow(deadline, opt); + } + + /// Similar to wait, but with a deadline. The thread is unblocked if the + /// deadline expires. + /// Note: Only a single call to wait/try_wait_for/try_wait_until is allowed + /// during a baton's life-cycle (from ctor/reset to dtor/reset). In other + /// words, after try_wait_until the caller can't invoke + /// wait/try_wait/try_wait_for/try_wait_until + /// again on the same baton without resetting it. + /// + /// @param deadline Time until which the thread can block + /// @return true if the baton was posted to before deadline, + /// false otherwise + template <typename Clock, typename Duration> + bool try_wait_until( + const std::chrono::time_point<Clock, Duration>& deadline, + const WaitOptions& opt = wait_options()) noexcept { + if (try_wait()) { + return true; + } + + return tryWaitSlow(deadline, opt); + } + + /// Alias to try_wait_for. Deprecated. + template <typename Rep, typename Period> + bool timed_wait( + const std::chrono::duration<Rep, Period>& timeout) noexcept { + return try_wait_for(timeout); + } + + /// Alias to try_wait_until. Deprecated. + template <typename Clock, typename Duration> + bool timed_wait( + const std::chrono::time_point<Clock, Duration>& deadline) noexcept { + return try_wait_until(deadline); + } + + private: + enum State : uint32_t { + INIT = 0, + EARLY_DELIVERY = 1, + WAITING = 2, + LATE_DELIVERY = 3, + TIMED_OUT = 4, + }; + + template <typename Clock, typename Duration> + bool tryWaitSlow( + const std::chrono::time_point<Clock, Duration>& deadline, + const WaitOptions& opt) noexcept { + switch (detail::spin_pause_until(deadline, opt, [=] { return ready(); })) { + case detail::spin_result::success: + return true; + case detail::spin_result::timeout: + return false; + case detail::spin_result::advance: + break; + } + + if (!MayBlock) { + switch (detail::spin_yield_until(deadline, [=] { return ready(); })) { + case detail::spin_result::success: + return true; + case detail::spin_result::timeout: + return false; + case detail::spin_result::advance: + break; + } + } + + // guess we have to block :( + uint32_t expected = INIT; + if (!state_.compare_exchange_strong( + expected, + WAITING, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + // CAS failed, last minute reprieve + assert(expected == EARLY_DELIVERY); + // TODO: move the acquire to the compare_exchange failure load after C++17 + std::atomic_thread_fence(std::memory_order_acquire); + return true; + } + + while (true) { + auto rv = detail::futexWaitUntil(&state_, WAITING, deadline); + + // Awoken by the deadline passing. + if (rv == detail::FutexResult::TIMEDOUT) { + assert(deadline != (std::chrono::time_point<Clock, Duration>::max())); + state_.store(TIMED_OUT, std::memory_order_release); + return false; + } + + // Probably awoken by a matching wake event, but could also by awoken + // by an asynchronous signal or by a spurious wakeup. + // + // state_ is the truth even if FUTEX_WAIT reported a matching + // FUTEX_WAKE, since we aren't using type-stable storage and we + // don't guarantee reuse. The scenario goes like this: thread + // A's last touch of a Baton is a call to wake(), which stores + // LATE_DELIVERY and gets an unlucky context switch before delivering + // the corresponding futexWake. Thread B sees LATE_DELIVERY + // without consuming a futex event, because it calls futexWait + // with an expected value of WAITING and hence doesn't go to sleep. + // B returns, so the Baton's memory is reused and becomes another + // Baton (or a reuse of this one). B calls futexWait on the new + // Baton lifetime, then A wakes up and delivers a spurious futexWake + // to the same memory location. B's futexWait will then report a + // consumed wake event even though state_ is still WAITING. + // + // It would be possible to add an extra state_ dance to communicate + // that the futexWake has been sent so that we can be sure to consume + // it before returning, but that would be a perf and complexity hit. + uint32_t s = state_.load(std::memory_order_acquire); + assert(s == WAITING || s == LATE_DELIVERY); + if (s == LATE_DELIVERY) { + return true; + } + } + } + + detail::Futex<Atom> state_; +}; + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex-inl.h b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex-inl.h new file mode 100644 index 000000000..8eedb9cd3 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex-inl.h @@ -0,0 +1,1703 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <folly/synchronization/DistributedMutex.h> + +#include <folly/ConstexprMath.h> +#include <folly/Portability.h> +#include <folly/ScopeGuard.h> +#include <folly/Utility.h> +#include <folly/chrono/Hardware.h> +#include <folly/detail/Futex.h> +#include <folly/lang/Align.h> +#include <folly/lang/Bits.h> +#include <folly/portability/Asm.h> +#include <folly/synchronization/AtomicNotification.h> +#include <folly/synchronization/AtomicUtil.h> +#include <folly/synchronization/detail/InlineFunctionRef.h> +#include <folly/synchronization/detail/Sleeper.h> + +#include <array> +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <limits> +#include <stdexcept> +#include <thread> +#include <utility> + +namespace folly { +namespace detail { +namespace distributed_mutex { +// kUnlocked is used to show unlocked state +// +// When locking threads encounter kUnlocked in the underlying storage, they +// can just acquire the lock without any further effort +constexpr auto kUnlocked = std::uintptr_t{0b0}; +// kLocked is used to show that the mutex is currently locked, and future +// attempts to lock the mutex should enqueue on the central storage +// +// Locking threads find this on central storage only when there is a +// contention chain that is undergoing wakeups, in every other case, a locker +// will either find kUnlocked or an arbitrary address with the kLocked bit set +constexpr auto kLocked = std::uintptr_t{0b1}; +// kTimedWaiter is set when there is at least one timed waiter on the mutex +// +// Timed waiters do not follow the sleeping strategy employed by regular, +// non-timed threads. They sleep on the central mutex atomic through an +// extended futex() interface that allows sleeping with the same semantics for +// non-standard integer widths +// +// When a regular non-timed thread unlocks or enqueues on the mutex, and sees +// a timed waiter, it takes ownership of all the timed waiters. The thread +// that has taken ownership of the timed waiter releases the timed waiters +// when it gets a chance at the critical section. At which point it issues a +// wakeup to single timed waiter, timed waiters always issue wake() calls to +// other timed waiters +constexpr auto kTimedWaiter = std::uintptr_t{0b10}; + +// kUninitialized means that the thread has just enqueued, and has not yet +// gotten to initializing itself with the address of its successor +// +// this becomes significant for threads that are trying to wake up the +// uninitialized thread, if they see that the thread is not yet initialized, +// they can do nothing but spin, and wait for the thread to get initialized +// +// This also plays a role in the functioning of flat combining as implemented +// in DistributedMutex. When a thread owning the lock goes through the +// contention chain to either unlock the mutex or combine critical sections +// from the other end. The presence of kUninitialized means that the +// combining thread is not able to make progress after this point. So we +// transfer the lock. +constexpr auto kUninitialized = std::uint32_t{0b0}; +// kWaiting will be set in the waiter's futex structs while they are spinning +// while waiting for the mutex +constexpr auto kWaiting = std::uint32_t{0b1}; +// kWake will be set by threads that are waking up waiters that have enqueued +constexpr auto kWake = std::uint32_t{0b10}; +// kSkipped will be set by a waker when they see that a waiter has been +// preempted away by the kernel, in this case the thread that got skipped will +// have to wake up and put itself back on the queue +constexpr auto kSkipped = std::uint32_t{0b11}; +// kAboutToWait will be set by a waiter that enqueues itself with the purpose +// of waiting on a futex +constexpr auto kAboutToWait = std::uint32_t{0b100}; +// kSleeping will be set by a waiter right before enqueueing on a futex. When +// a thread wants to wake up a waiter that has enqueued on a futex, it should +// set the futex to contain kWake +// +// a thread that is unlocking and wants to skip over a sleeping thread also +// calls futex_.exchange(kSleeping) on the sleeping thread's futex word. It +// does this to 1. detect whether the sleeping thread had actually gone to +// sleeping on the futex word so it can skip it, and 2. to synchronize with +// other non atomic writes in the sleeping thread's context (such as the write +// to track the next waiting thread). +// +// We reuse kSleeping instead of say using another constant kEarlyDelivery to +// avoid situations where a thread has to enter kernel mode due to calling +// futexWait() twice because of the presence of a waking thread. This +// situation can arise when an unlocking thread goes to skip over a sleeping +// thread, sees that the thread has slept and move on, but the sleeping thread +// had not yet entered futex(). This interleaving causes the thread calling +// futex() to return spuriously, as the futex word is not what it should be +constexpr auto kSleeping = std::uint32_t{0b101}; +// kCombined is set by the lock holder to let the waiter thread know that its +// combine request was successfully completed by the lock holder. A +// successful combine means that the thread requesting the combine operation +// does not need to unlock the mutex; in fact, doing so would be an error. +constexpr auto kCombined = std::uint32_t{0b111}; +// kCombineUninitialized is like kUninitialized but is set by a thread when it +// enqueues in hopes of getting its critical section combined with the lock +// holder +constexpr auto kCombineUninitialized = std::uint32_t{0b1000}; +// kCombineWaiting is set by a thread when it is ready to have its combine +// record fulfilled by the lock holder. In particular, this signals to the +// lock holder that the thread has set its next_ pointer in the contention +// chain +constexpr auto kCombineWaiting = std::uint32_t{0b1001}; +// kExceptionOccurred is set on the waiter futex when the remote task throws +// an exception. It is the caller's responsibility to retrieve the exception +// and rethrow it in their own context. Note that when the caller uses a +// noexcept function as their critical section, they can avoid checking for +// this value +// +// This allows us to avoid all cost of exceptions in the memory layout of the +// fast path (no errors) as exceptions are stored as an std::exception_ptr in +// the same union that stores the return value of the critical section. We +// also avoid all CPU overhead because the combiner uses a try-catch block +// without any additional branching to handle exceptions +constexpr auto kExceptionOccurred = std::uint32_t{0b1010}; + +// The number of spins that we are allowed to do before we resort to marking a +// thread as having slept +// +// This is just a magic number from benchmarks +constexpr auto kScheduledAwaySpinThreshold = std::chrono::nanoseconds{200}; +// The maximum number of spins before a thread starts yielding its processor +// in hopes of getting skipped +constexpr auto kMaxSpins = 4000; +// The maximum number of contention chains we can resolve with flat combining. +// After this number of contention chains, the mutex falls back to regular +// two-phased mutual exclusion to ensure that we don't starve the combiner +// thread +constexpr auto kMaxCombineIterations = 2; + +/** + * Write only data that is available to the thread that is waking up another. + * Only the waking thread is allowed to write to this, the thread to be woken + * is allowed to read from this after a wakeup has been issued + */ +template <template <typename> class Atomic> +class WakerMetadata { + public: + explicit WakerMetadata( + std::uintptr_t waker = 0, + std::uintptr_t waiters = 0, + std::uint32_t sleeper = kUninitialized) + : waker_{waker}, waiters_{waiters}, sleeper_{sleeper} {} + + // This is the thread that initiated wakeups for the contention chain. + // There can only ever be one thread that initiates the wakeup for a + // chain in the spin only version of this mutex. When a thread that just + // woke up sees this as the next thread to wake up, it knows that it is the + // terminal node in the contention chain. This means that it was the one + // that took off the thread that had acquired the mutex off the centralized + // state. Therefore, the current thread is the last in its contention + // chain. It will fall back to centralized storage to pick up the next + // waiter or release the mutex + // + // When we move to a full sleeping implementation, this might need to change + // to a small_vector<> to account for failed wakeups, or we can put threads + // to sleep on the central futex, which is an easier implementation + // strategy. Although, since this is allocated on the stack, we can set a + // prohitively large threshold to avoid heap allocations, this strategy + // however, might cause increased cache misses on wakeup signalling + std::uintptr_t waker_{0}; + // the list of threads that the waker had previously seen to be sleeping on + // a futex(), + // + // this is given to the current thread as a means to pass on + // information. When the current thread goes to unlock the mutex and does + // not see contention, it should go and wake up the head of this list. If + // the current thread sees a contention chain on the mutex, it should pass + // on this list to the next thread that gets woken up + std::uintptr_t waiters_{0}; + // The futex that this waiter will sleep on + // + // how can we reuse futex_ from above for futex management? + Futex<Atomic> sleeper_{kUninitialized}; +}; + +/** + * Type of the type-erased callable that is used for combining from the lock + * holder's end. This has 48 bytes of inline storage that can be used to + * minimize cache misses when combining + */ +using CombineFunction = detail::InlineFunctionRef<void(), 48>; + +/** + * Waiter encapsulates the state required for waiting on the mutex, this + * contains potentially heavy state and is intended to be allocated on the + * stack as part of a lock() function call + * + * To ensure that synchronization does not cause unintended side effects on + * the rest of the thread stack (eg. metadata in lockImplementation(), or any + * other data in the user's thread), we aggresively pad this struct and use + * custom alignment internally to ensure that the relevant data fits within a + * single cacheline. The added alignment here also gives us some room to + * wiggle in the bottom few bits of the mutex, where we store extra metadata + */ +template <template <typename> class Atomic> +class Waiter { + public: + Waiter() {} + Waiter(Waiter&&) = delete; + Waiter(const Waiter&) = delete; + Waiter& operator=(Waiter&&) = delete; + Waiter& operator=(const Waiter&) = delete; + + void initialize(std::uint64_t futex, CombineFunction task) { + // we only initialize the function if we were actually given a non-null + // task, otherwise + if (task) { + assert(futex == kCombineUninitialized); + new (&function_) CombineFunction(task); + } else { + assert((futex == kUninitialized) || (futex == kAboutToWait)); + new (&metadata_) WakerMetadata<Atomic>{}; + } + + // this pedantic store is needed to ensure that the waking thread + // synchronizes with the state in the waiter struct when it loads the + // value of the futex word + // + // on x86, this gets optimized away to just a regular store, it might be + // needed on platforms where explicit acquire-release barriers are + // required for synchronization + // + // note that we release here at the end of the constructor because + // construction is complete here, any thread that acquires this release + // will see a well constructed wait node + futex_.store(futex, std::memory_order_release); + } + + std::array<std::uint8_t, hardware_destructive_interference_size> padding1; + // the atomic that this thread will spin on while waiting for the mutex to + // be unlocked + alignas(hardware_destructive_interference_size) Atomic<std::uint64_t> futex_{ + kUninitialized}; + // The successor of this node. This will be the thread that had its address + // on the mutex previously + // + // We can do without making this atomic since the remote thread synchronizes + // on the futex variable above. If this were not atomic, the remote thread + // would only be allowed to read from it after the waiter has moved into the + // waiting state to avoid risk of a load racing with a write. However, it + // helps to make this atomic because we can use an unconditional load and make + // full use of the load buffer to coalesce both reads into a single clock + // cycle after the line arrives in the combiner core. This is a heavily + // contended line, so an RFO from the enqueueing thread is highly likely and + // has the potential to cause an immediate invalidation; blocking the combiner + // thread from making progress until the line is pulled back to read this + // value + // + // Further, making this atomic prevents the compiler from making an incorrect + // optimization where it does not load the value as written in the code, but + // rather dereferences it through a pointer whenever needed (since the value + // of the pointer to this is readily available on the stack). Doing this + // causes multiple invalidation requests from the enqueueing thread, blocking + // remote progress + // + // Note that we use relaxed loads and stores, so this should not have any + // additional overhead compared to a regular load on most architectures + std::atomic<std::uintptr_t> next_{0}; + // We use an anonymous union for the combined critical section request and + // the metadata that will be filled in from the leader's end. Only one is + // active at a time - if a leader decides to combine the requested critical + // section into its execution, it will not touch the metadata field. If a + // leader decides to migrate the lock to the waiter, it will not touch the + // function + // + // this allows us to transfer more state when combining a critical section + // and reduce the cache misses originating from executing an arbitrary + // lambda + // + // note that this is an anonymous union, not an unnamed union, the members + // leak into the surrounding scope + union { + // metadata for the waker + WakerMetadata<Atomic> metadata_; + // The critical section that can potentially be combined into the critical + // section of the locking thread + // + // This is kept as a FunctionRef because the original function is preserved + // until the lock_combine() function returns. A consequence of using + // FunctionRef here is that we don't need to do any allocations and can + // allow users to capture unbounded state into the critical section. Flat + // combining means that the user does not have access to the thread + // executing the critical section, so assumptions about thread local + // references can be invalidated. Being able to capture arbitrary state + // allows the user to do thread local accesses right before the critical + // section and pass them as state to the callable being referenced here + CombineFunction function_; + // The user is allowed to use a combined critical section that returns a + // value. This buffer is used to implement the value transfer to the + // waiting thread. We reuse the same union because this helps us combine + // one synchronization operation with a material value transfer. + // + // The waker thread needs to synchronize on this cacheline to issue a + // wakeup to the waiter, meaning that the entire line needs to be pulled + // into the remote core in exclusive mode. So we reuse the coherence + // operation to transfer the return value in addition to the + // synchronization signal. In the case that the user's data item is + // small, the data is transferred all inline as part of the same line, + // which pretty much arrives into the CPU cache in the same clock cycle or + // two after a read-for-ownership request. This gives us a high chance of + // coalescing the entire transitive store buffer together into one cache + // coherence operation from the waker's end. This allows us to make use + // of the CPU bus bandwidth which would have otherwise gone to waste. + // Benchmarks prove this theory under a wide range of contention, value + // sizes, NUMA interactions and processor models + // + // The current version of the Intel optimization manual confirms this + // theory somewhat as well in section 2.3.5.1 (Load and Store Operation + // Overview) + // + // When an instruction writes data to a memory location [...], the + // processor ensures that it has the line containing this memory location + // is in its L1d cache [...]. If the cache line is not there, it fetches + // from the next levels using a RFO request [...] RFO and storing the + // data happens after instruction retirement. Therefore, the store + // latency usually does not affect the store instruction itself + // + // This gives the user the ability to input up to 48 bytes into the + // combined critical section through an InlineFunctionRef and output 48 + // bytes from it basically without any cost. The type of the entity + // stored in the buffer has to be matched by the type erased callable that + // the caller has used. At this point, the caller is still in the + // template instantiation leading to the combine request, so it has + // knowledge of the return type and can apply the appropriate + // reinterpret_cast and launder operation to safely retrieve the data from + // this buffer + _t<std::aligned_storage<48, 8>> storage_; + }; + std::array<std::uint8_t, hardware_destructive_interference_size> padding2; +}; + +/** + * A template that helps us differentiate between the different ways to return + * a value from a combined critical section. A return value of type void + * cannot be stored anywhere, so we use specializations and pick the right one + * switched through std::conditional_t + * + * This is then used by CoalescedTask and its family of functions to implement + * efficient return value transfers to the waiting threads + */ +template <typename Func> +class RequestWithReturn { + public: + using F = Func; + using ReturnType = decltype(std::declval<const Func&>()()); + explicit RequestWithReturn(Func func) : func_{std::move(func)} {} + + /** + * We need to define the destructor here because C++ requires (with good + * reason) that a union with non-default destructor be explicitly destroyed + * from the surrounding class, as neither the runtime nor compiler have the + * knowledge of what to do with a union at the time of destruction + * + * Each request that has a valid return value set will have the value + * retrieved from the get() method, where the value is destroyed. So we + * don't need to destroy it here + */ + ~RequestWithReturn() {} + + /** + * This method can be used to return a value from the request. This returns + * the underlying value because return type of the function we were + * instantiated with is not void + */ + ReturnType get() && { + // when the return value has been processed, we destroy the value + // contained in this request. Using a scope_exit means that we don't have + // to worry about storing the value somewhere and causing potentially an + // extra move + // + // note that the invariant here is that this function is only called if the + // requesting thread had it's critical section combined, and the value_ + // member constructed through detach() + SCOPE_EXIT { + value_.~ReturnType(); + }; + return std::move(value_); + } + + // this contains a copy of the function the waiter had requested to be + // executed as a combined critical section + Func func_; + // this stores the return value used in the request, we use a union here to + // avoid laundering and allow return types that are not default + // constructible to be propagated through the execution of the critical + // section + // + // note that this is an anonymous union, the member leaks into the + // surrounding scope as a member variable + union { + ReturnType value_; + }; +}; + +template <typename Func> +class RequestWithoutReturn { + public: + using F = Func; + using ReturnType = void; + explicit RequestWithoutReturn(Func func) : func_{std::move(func)} {} + + /** + * In this version of the request class, get() returns nothing as there is + * no stored value + */ + void get() && {} + + // this contains a copy of the function the waiter had requested to be + // executed as a combined critical section + Func func_; +}; + +// we need to use std::integral_constant::value here as opposed to +// std::integral_constant::operator T() because MSVC errors out with the +// implicit conversion +template <typename Func> +using Request = _t<std::conditional< + std::is_void<decltype(std::declval<const Func&>()())>::value, + RequestWithoutReturn<Func>, + RequestWithReturn<Func>>>; + +/** + * A template that helps us to transform a callable returning a value to one + * that returns void so it can be type erased and passed on to the waker. If + * the return value is small enough, it gets coalesced into the wait struct + * for optimal data transfer. When it's not small enough to fit in the waiter + * storage buffer, we place it on it's own cacheline with isolation to prevent + * false-sharing with the on-stack metadata of the waiter thread + * + * This helps a combined critical section feel more normal in the case where + * the user wants to return a value, for example + * + * auto value = mutex_.lock_combine([&]() { + * return data_.value(); + * }); + * + * Without this, the user would typically create a dummy object that they + * would then assign to from within the lambda. With return value chaining, + * this pattern feels more natural + * + * Note that it is important to copy the entire callble into this class. + * Storing something like a reference instead is not desirable because it does + * not allow InlineFunctionRef to use inline storage to represent the user's + * callable without extra indirections + * + * We use std::conditional_t and switch to the right type of task with the + * CoalescedTask type alias + */ +template <typename Func, typename Waiter> +class TaskWithCoalesce { + public: + using ReturnType = decltype(std::declval<const Func&>()()); + using StorageType = folly::Unit; + explicit TaskWithCoalesce(Func func, Waiter& waiter) + : func_{std::move(func)}, waiter_(waiter) {} + + void operator()() const { + auto value = func_(); + new (&waiter_.storage_) ReturnType(std::move(value)); + } + + private: + Func func_; + Waiter& waiter_; + + static_assert(!std::is_void<ReturnType>{}, ""); + static_assert(alignof(decltype(waiter_.storage_)) >= alignof(ReturnType), ""); + static_assert(sizeof(decltype(waiter_.storage_)) >= sizeof(ReturnType), ""); +}; + +template <typename Func, typename Waiter> +class TaskWithoutCoalesce { + public: + using ReturnType = void; + using StorageType = folly::Unit; + explicit TaskWithoutCoalesce(Func func, Waiter&) : func_{std::move(func)} {} + + void operator()() const { + func_(); + } + + private: + Func func_; +}; + +template <typename Func, typename Waiter> +class TaskWithBigReturnValue { + public: + // Using storage that is aligned on the cacheline boundary helps us avoid a + // situation where the data ends up being allocated on two separate + // cachelines. This would require the remote thread to pull in both lines + // to issue a write. + // + // We also isolate the storage by appending some padding to the end to + // ensure we avoid false-sharing with the metadata used while the waiter + // waits + using ReturnType = decltype(std::declval<const Func&>()()); + static const auto kReturnValueAlignment = folly::kIsMsvc + ? 8 + : folly::constexpr_max( + alignof(ReturnType), + folly::hardware_destructive_interference_size); + using StorageType = _t<std::aligned_storage< + sizeof( + _t<std::aligned_storage<sizeof(ReturnType), kReturnValueAlignment>>), + kReturnValueAlignment>>; + + explicit TaskWithBigReturnValue(Func func, Waiter&) + : func_{std::move(func)} {} + + void operator()() const { + assert(storage_); + auto value = func_(); + new (storage_) ReturnType(std::move(value)); + } + + void attach(StorageType* storage) { + assert(!storage_); + storage_ = storage; + } + + private: + Func func_; + StorageType* storage_{nullptr}; + + static_assert(!std::is_void<ReturnType>{}, ""); + static_assert(sizeof(Waiter::storage_) < sizeof(ReturnType), ""); +}; + +template <typename T, bool> +struct Sizeof_; +template <typename T> +struct Sizeof_<T, false> : std::integral_constant<std::size_t, sizeof(T)> {}; +template <typename T> +struct Sizeof_<T, true> : std::integral_constant<std::size_t, 0> {}; +template <typename T> +struct Sizeof : Sizeof_<T, std::is_void<T>::value> {}; + +// we need to use std::integral_constant::value here as opposed to +// std::integral_constant::operator T() because MSVC errors out with the +// implicit conversion +template <typename Func, typename Waiter> +using CoalescedTask = _t<std::conditional< + std::is_void<decltype(std::declval<const Func&>()())>::value, + TaskWithoutCoalesce<Func, Waiter>, + _t<std::conditional< + Sizeof<decltype(std::declval<const Func&>()())>::value <= + sizeof(Waiter::storage_), + TaskWithCoalesce<Func, Waiter>, + TaskWithBigReturnValue<Func, Waiter>>>>>; + +/** + * Given a request and a wait node, coalesce them into a CoalescedTask that + * coalesces the return value into the wait node when invoked from a remote + * thread + * + * When given a null request through nullptr_t, coalesce() returns null as well + */ +template <typename Waiter> +std::nullptr_t coalesce(std::nullptr_t&, Waiter&) { + return nullptr; +} + +template < + typename Request, + typename Waiter, + typename Func = typename Request::F> +CoalescedTask<Func, Waiter> coalesce(Request& request, Waiter& waiter) { + static_assert(!std::is_same<Request, std::nullptr_t>{}, ""); + return CoalescedTask<Func, Waiter>{request.func_, waiter}; +} + +/** + * Given a task, create storage for the return value. When we get a type + * of CoalescedTask, this returns an instance of CoalescedTask::StorageType. + * std::nullptr_t otherwise + */ +inline std::nullptr_t makeReturnValueStorageFor(std::nullptr_t&) { + return {}; +} + +template < + typename CoalescedTask, + typename StorageType = typename CoalescedTask::StorageType> +StorageType makeReturnValueStorageFor(CoalescedTask&) { + return {}; +} + +/** + * Given a task and storage, attach them together if needed. This only helps + * when we have a task that returns a value bigger than can be coalesced. In + * that case, we need to attach the storage with the task so the return value + * can be transferred to this thread from the remote thread + */ +template <typename Task, typename Storage> +void attach(Task&, Storage&) { + static_assert( + std::is_same<Storage, std::nullptr_t>{} || + std::is_same<Storage, folly::Unit>{}, + ""); +} + +template < + typename R, + typename W, + typename StorageType = typename TaskWithBigReturnValue<R, W>::StorageType> +void attach(TaskWithBigReturnValue<R, W>& task, StorageType& storage) { + task.attach(&storage); +} + +template <typename Request, typename Waiter> +void throwIfExceptionOccurred(Request&, Waiter& waiter, bool exception) { + using Storage = decltype(waiter.storage_); + using F = typename Request::F; + static_assert(sizeof(Storage) >= sizeof(std::exception_ptr), ""); + static_assert(alignof(Storage) >= alignof(std::exception_ptr), ""); + + // we only need to check for an exception in the waiter struct if the passed + // callable is not noexcept + // + // we need to make another instance of the exception with automatic storage + // duration and destroy the exception held in the storage *before throwing* to + // avoid leaks. If we don't destroy the exception_ptr in storage, the + // refcount for the internal exception will never hit zero, thereby leaking + // memory + if ((!noexcept(std::declval<const F&>()()) && exception)) { + auto storage = &waiter.storage_; + auto exc = folly::launder(reinterpret_cast<std::exception_ptr*>(storage)); + auto copy = std::move(*exc); + exc->std::exception_ptr::~exception_ptr(); + std::rethrow_exception(std::move(copy)); + } +} + +/** + * Given a CoalescedTask, a wait node and a request. Detach the return value + * into the request from the wait node and task. + */ +template <typename Waiter> +void detach(std::nullptr_t&, Waiter&, bool exception, std::nullptr_t&) { + assert(!exception); +} + +template <typename Waiter, typename F> +void detach( + RequestWithoutReturn<F>& request, + Waiter& waiter, + bool exception, + folly::Unit&) { + throwIfExceptionOccurred(request, waiter, exception); +} + +template <typename Waiter, typename F> +void detach( + RequestWithReturn<F>& request, + Waiter& waiter, + bool exception, + folly::Unit&) { + throwIfExceptionOccurred(request, waiter, exception); + + using ReturnType = typename RequestWithReturn<F>::ReturnType; + static_assert(!std::is_same<ReturnType, void>{}, ""); + static_assert(sizeof(waiter.storage_) >= sizeof(ReturnType), ""); + + auto& val = *folly::launder(reinterpret_cast<ReturnType*>(&waiter.storage_)); + new (&request.value_) ReturnType(std::move(val)); + val.~ReturnType(); +} + +template <typename Waiter, typename F, typename Storage> +void detach( + RequestWithReturn<F>& request, + Waiter& waiter, + bool exception, + Storage& storage) { + throwIfExceptionOccurred(request, waiter, exception); + + using ReturnType = typename RequestWithReturn<F>::ReturnType; + static_assert(!std::is_same<ReturnType, void>{}, ""); + static_assert(sizeof(storage) >= sizeof(ReturnType), ""); + + auto& val = *folly::launder(reinterpret_cast<ReturnType*>(&storage)); + new (&request.value_) ReturnType(std::move(val)); + val.~ReturnType(); +} + +/** + * Get the time since epoch in nanoseconds + * + * This is faster than std::chrono::steady_clock because it avoids a VDSO + * access to get the timestamp counter + * + * Note that the hardware timestamp counter on x86, like std::steady_clock is + * guaranteed to be monotonically increasing - + * https://c9x.me/x86/html/file_module_x86_id_278.html + */ +inline std::chrono::nanoseconds time() { + return std::chrono::nanoseconds{hardware_timestamp()}; +} + +/** + * Zero out the other bits used by the implementation and return just an + * address from a uintptr_t + */ +template <typename Type> +Type* extractPtr(std::uintptr_t from) { + // shift one bit off the end, to get all 1s followed by a single 0 + auto mask = std::numeric_limits<std::uintptr_t>::max(); + mask >>= 1; + mask <<= 1; + assert(!(mask & 0b1)); + + return folly::bit_cast<Type*>(from & mask); +} + +/** + * Strips the given nanoseconds into only the least significant 56 bits by + * moving the least significant 56 bits over by 8 zeroing out the bottom 8 + * bits to be used as a medium of information transfer for the thread wait + * nodes + */ +inline std::uint64_t strip(std::chrono::nanoseconds t) { + auto time = t.count(); + return static_cast<std::uint64_t>(time) << 8; +} + +/** + * Recover the timestamp value from an integer that has the timestamp encoded + * in it + */ +inline std::uint64_t recover(std::uint64_t from) { + return from >> 8; +} + +template <template <typename> class Atomic, bool TimePublishing> +class DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy { + public: + // DistributedMutexStateProxy is move constructible and assignable for + // convenience + DistributedMutexStateProxy(DistributedMutexStateProxy&& other) { + *this = std::move(other); + } + + DistributedMutexStateProxy& operator=(DistributedMutexStateProxy&& other) { + assert(!(*this)); + + next_ = folly::exchange(other.next_, nullptr); + expected_ = folly::exchange(other.expected_, 0); + timedWaiters_ = folly::exchange(other.timedWaiters_, false); + combined_ = folly::exchange(other.combined_, false); + waker_ = folly::exchange(other.waker_, 0); + waiters_ = folly::exchange(other.waiters_, nullptr); + ready_ = folly::exchange(other.ready_, nullptr); + + return *this; + } + + // The proxy is valid when a mutex acquisition attempt was successful, + // lock() is guaranteed to return a valid proxy, try_lock() is not + explicit operator bool() const { + return expected_; + } + + // private: + // friend the mutex class, since that will be accessing state private to + // this class + friend class DistributedMutex<Atomic, TimePublishing>; + + DistributedMutexStateProxy( + Waiter<Atomic>* next, + std::uintptr_t expected, + bool timedWaiter = false, + bool combined = false, + std::uintptr_t waker = 0, + Waiter<Atomic>* waiters = nullptr, + Waiter<Atomic>* ready = nullptr) + : next_{next}, + expected_{expected}, + timedWaiters_{timedWaiter}, + combined_{combined}, + waker_{waker}, + waiters_{waiters}, + ready_{ready} {} + + // the next thread that is to be woken up, this being null at the time of + // unlock() shows that the current thread acquired the mutex without + // contention or it was the terminal thread in the queue of threads waking up + Waiter<Atomic>* next_{nullptr}; + // this is the value that the current thread should expect to find on + // unlock, and if this value is not there on unlock, the current thread + // should assume that other threads are enqueued waiting for the mutex + // + // note that if the mutex has the same state set at unlock time, and this is + // set to an address (and not say kLocked in the case of a terminal waker) + // then it must have been the case that no other thread had enqueued itself, + // since threads in the domain of this mutex do not share stack space + // + // if we want to support stack sharing, we can solve the problem by looping + // at lock time, and setting a variable that says whether we have acquired + // the lock or not perhaps + std::uintptr_t expected_{0}; + // a boolean that will be set when the mutex has timed waiters that the + // current thread is responsible for waking, in such a case, the current + // thread will issue an atomic_notify_one() call after unlocking the mutex + // + // note that a timed waiter will itself always have this flag set. This is + // done so we can avoid having to issue a atomic_notify_all() call (and + // subsequently a thundering herd) when waking up timed-wait threads + bool timedWaiters_{false}; + // a boolean that contains true if the state proxy is not meant to be passed + // to the unlock() function. This is set only when there is contention and + // a thread had asked for its critical section to be combined + bool combined_{false}; + // metadata passed along from the thread that woke this thread up + std::uintptr_t waker_{0}; + // the list of threads that are waiting on a futex + // + // the current threads is meant to wake up this list of waiters if it is + // able to commit an unlock() on the mutex without seeing a contention chain + Waiter<Atomic>* waiters_{nullptr}; + // after a thread has woken up from a futex() call, it will have the rest of + // the threads that it were waiting behind it in this list, a thread that + // unlocks has to wake up threads from this list if it has any, before it + // goes to sleep to prevent pathological unfairness + Waiter<Atomic>* ready_{nullptr}; +}; + +template <template <typename> class Atomic, bool TimePublishing> +DistributedMutex<Atomic, TimePublishing>::DistributedMutex() + : state_{kUnlocked} {} + +template <typename Waiter> +std::uint64_t publish( + std::uint64_t spins, + bool& shouldPublish, + std::chrono::nanoseconds& previous, + Waiter& waiter, + std::uint32_t waitMode) { + // time publishing has some overhead because it executes an atomic exchange on + // the futex word. If this line is in a remote thread (eg. the combiner), + // then each time we publish a timestamp, this thread has to submit an RFO to + // the remote core for the cacheline, blocking progress for both threads. + // + // the remote core uses a store in the fast path - why then does an RFO make a + // difference? The only educated guess we have here is that the added + // roundtrip delays draining of the store buffer, which essentially exerts + // backpressure on future stores, preventing parallelization + // + // if we have requested a combine, time publishing is less important as it + // only comes into play when the combiner has exhausted their max combine + // passes. So we defer time publishing to the point when the current thread + // gets preempted + auto current = time(); + if ((current - previous) >= kScheduledAwaySpinThreshold) { + shouldPublish = true; + } + previous = current; + + // if we have requested a combine, and this is the first iteration of the + // wait-loop, we publish a max timestamp to optimistically convey that we have + // not yet been preempted (the remote knows the meaning of max timestamps) + // + // then if we are under the maximum number of spins allowed before sleeping, + // we publish the exact timestamp, otherwise we publish the minimum possible + // timestamp to force the waking thread to skip us + auto now = ((waitMode == kCombineWaiting) && !spins) + ? decltype(time())::max() + : (spins < kMaxSpins) ? previous : decltype(time())::zero(); + + // the wait mode information is published in the bottom 8 bits of the futex + // word, the rest contains time information as computed above. Overflows are + // not really a correctness concern because time publishing is only a + // heuristic. This leaves us 56 bits of nanoseconds (2 years) before we hit + // two consecutive wraparounds, so the lack of bits to respresent time is + // neither a performance nor correctness concern + auto data = strip(now) | waitMode; + auto signal = (shouldPublish || !spins || (waitMode != kCombineWaiting)) + ? waiter.futex_.exchange(data, std::memory_order_acq_rel) + : waiter.futex_.load(std::memory_order_acquire); + return signal & std::numeric_limits<std::uint8_t>::max(); +} + +template <typename Waiter> +bool spin(Waiter& waiter, std::uint32_t& sig, std::uint32_t mode) { + auto spins = std::uint64_t{0}; + auto waitMode = (mode == kCombineUninitialized) ? kCombineWaiting : kWaiting; + auto previous = time(); + auto shouldPublish = false; + while (true) { + auto signal = publish(spins++, shouldPublish, previous, waiter, waitMode); + + // if we got skipped, make a note of it and return if we got a skipped + // signal or a signal to wake up + auto skipped = (signal == kSkipped); + auto combined = (signal == kCombined); + auto exceptionOccurred = (signal == kExceptionOccurred); + auto woken = (signal == kWake); + if (skipped || woken || combined || exceptionOccurred) { + sig = static_cast<std::uint32_t>(signal); + return !skipped; + } + + // if we are under the spin threshold, pause to allow the other + // hyperthread to run. If not, then sleep + if (spins < kMaxSpins) { + asm_volatile_pause(); + } else { + Sleeper::sleep(); + } + } +} + +template <typename Waiter> +void doFutexWake(Waiter* waiter) { + if (waiter) { + // We can use a simple store operation here and not worry about checking + // to see if the thread had actually started waiting on the futex, that is + // already done in tryWake() when a sleeping thread is collected + // + // We now do not know whether the waiter had already enqueued on the futex + // or whether it had just stored kSleeping in its futex and was about to + // call futexWait(). We treat both these scenarios the same + // + // the below can theoretically cause a problem if we set the + // wake signal and the waiter was in between setting kSleeping in its + // futex and enqueueing on the futex. In this case the waiter will just + // return from futexWait() immediately. This leaves the address that the + // waiter was using for futexWait() possibly dangling, and the thread that + // we woke in the exchange above might have used that address for some + // other object + // + // however, even if the thread had indeed woken up simply becasue of the + // above exchange(), the futexWake() below is not incorrect. It is not + // incorrect because futexWake() does not actually change the memory of + // the futex word. It just uses the address to do a lookup in the kernel + // futex table. And even if we call futexWake() on some other address, + // and that address was being used to wait on futex() that thread will + // protect itself from spurious wakeups, check the value in the futex word + // and enqueue itself back on the futex + // + // this dangilng pointer possibility is why we use a pointer to the futex + // word, and avoid dereferencing after the store() operation + auto sleeper = &waiter->metadata_.sleeper_; + sleeper->store(kWake, std::memory_order_release); + futexWake(sleeper, 1); + } +} + +template <typename Waiter> +bool doFutexWait(Waiter* waiter, Waiter*& next) { + // first we get ready to sleep by calling exchange() on the futex with a + // kSleeping value + assert(waiter->futex_.load(std::memory_order_relaxed) == kAboutToWait); + + // note the semantics of using a futex here, when we exchange the sleeper_ + // with kSleeping, we are getting ready to sleep, but before sleeping we get + // ready to sleep, and we return from futexWait() when the value of + // sleeper_ might have changed. We can also wake up because of a spurious + // wakeup, so we always check against the value in sleeper_ after returning + // from futexWait(), if the value is not kWake, then we continue + auto pre = + waiter->metadata_.sleeper_.exchange(kSleeping, std::memory_order_acq_rel); + + // Seeing a kSleeping on a futex word before we set it ourselves means only + // one thing - an unlocking thread caught us before we went to futex(), and + // we now have the lock, so we abort + // + // if we were given an early delivery, we can return from this function with + // a true, meaning that we now have the lock + if (pre == kSleeping) { + return true; + } + + // if we reach here then were were not given an early delivery, and any + // thread that goes to wake us up will see a consistent view of the rest of + // the contention chain (since the next_ variable is set before the + // kSleeping exchange above) + while (pre != kWake) { + // before enqueueing on the futex, we wake any waiters that we were + // possibly responsible for + doFutexWake(folly::exchange(next, nullptr)); + + // then we wait on the futex + // + // note that we have to protect ourselves against spurious wakeups here. + // Because the corresponding futexWake() above does not synchronize + // wakeups around the futex word. Because doing so would become + // inefficient + futexWait(&waiter->metadata_.sleeper_, kSleeping); + pre = waiter->metadata_.sleeper_.load(std::memory_order_acquire); + assert((pre == kSleeping) || (pre == kWake)); + } + + // when coming out of a futex, we might have some other sleeping threads + // that we were supposed to wake up, assign that to the next pointer + assert(next == nullptr); + next = extractPtr<Waiter>(waiter->next_.load(std::memory_order_relaxed)); + return false; +} + +template <typename Waiter> +bool wait(Waiter* waiter, std::uint32_t mode, Waiter*& next, uint32_t& signal) { + if (mode == kAboutToWait) { + return doFutexWait(waiter, next); + } + + return spin(*waiter, signal, mode); +} + +inline void recordTimedWaiterAndClearTimedBit( + bool& timedWaiter, + std::uintptr_t& previous) { + // the previous value in the mutex can never be kTimedWaiter, timed waiters + // always set (kTimedWaiter | kLocked) in the mutex word when they try and + // acquire the mutex + assert(previous != kTimedWaiter); + + if ((previous & kTimedWaiter)) { + // record whether there was a timed waiter in the previous mutex state, and + // clear the timed bit from the previous state + timedWaiter = true; + previous = previous & (~kTimedWaiter); + } +} + +template <typename Atomic> +void wakeTimedWaiters(Atomic* state, bool timedWaiters) { + if ((timedWaiters)) { + atomic_notify_one(state); + } +} + +template <template <typename> class Atomic, bool TimePublishing> +template <typename Func> +auto DistributedMutex<Atomic, TimePublishing>::lock_combine(Func func) + -> decltype(std::declval<const Func&>()()) { + // invoke the lock implementation function and check whether we came out of + // it with our task executed as a combined critical section. This usually + // happens when the mutex is contended. + // + // In the absence of contention, we just return from the try_lock() function + // with the lock acquired. So we need to invoke the task and unlock + // the mutex before returning + auto&& task = Request<Func>{func}; + auto&& state = lockImplementation(*this, state_, task); + if (!state.combined_) { + // to avoid having to play a return-value dance when the combinable + // returns void, we use a scope exit to perform the unlock after the + // function return has been processed + SCOPE_EXIT { + unlock(std::move(state)); + }; + return func(); + } + + // if we are here, that means we were able to get our request combined, we + // can return the value that was transferred to us + // + // each thread that enqueues as a part of a contention chain takes up the + // responsibility of any timed waiter that had come immediately before it, + // so we wake up timed waiters before exiting the lock function. Another + // strategy might be to add the timed waiter information to the metadata and + // let a single leader wake up a timed waiter for better concurrency. But + // this has proven not to be useful in benchmarks beyond a small 5% delta, + // so we avoid taking the complexity hit and branch to wake up timed waiters + // from each thread + wakeTimedWaiters(&state_, state.timedWaiters_); + return std::move(task).get(); +} + +template <template <typename> class Atomic, bool TimePublishing> +typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy +DistributedMutex<Atomic, TimePublishing>::lock() { + auto null = nullptr; + return lockImplementation(*this, state_, null); +} + +template <typename Atomic, template <typename> class A, bool T> +auto tryLockNoLoad(Atomic& atomic, DistributedMutex<A, T>&) + -> typename DistributedMutex<A, T>::DistributedMutexStateProxy { + // Try and set the least significant bit of the centralized lock state to 1, + // if this succeeds, it must have been the case that we had a kUnlocked (or + // 0) in the central storage before, since that is the only case where a 0 + // can be found in the least significant bit + // + // If this fails, then it is a no-op + using Proxy = typename DistributedMutex<A, T>::DistributedMutexStateProxy; + auto previous = atomic_fetch_set(atomic, 0, std::memory_order_acquire); + if (!previous) { + return Proxy{nullptr, kLocked}; + } + + return Proxy{nullptr, 0}; +} + +template <template <typename> class Atomic, bool TimePublishing> +typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy +DistributedMutex<Atomic, TimePublishing>::try_lock() { + // The lock attempt below requires an expensive atomic fetch-and-mutate or + // an even more expensive atomic compare-and-swap loop depending on the + // platform. These operations require pulling the lock cacheline into the + // current core in exclusive mode and are therefore hard to parallelize + // + // This probabilistically avoids the expense by first checking whether the + // mutex is currently locked + if (state_.load(std::memory_order_relaxed) != kUnlocked) { + return DistributedMutexStateProxy{nullptr, 0}; + } + + return tryLockNoLoad(state_, *this); +} + +template < + template <typename> class Atomic, + bool TimePublishing, + typename State, + typename Request> +typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy +lockImplementation( + DistributedMutex<Atomic, TimePublishing>& mutex, + State& atomic, + Request& request) { + // first try and acquire the lock as a fast path, the underlying + // implementation is slightly faster than using std::atomic::exchange() as + // is used in this function. So we get a small perf boost in the + // uncontended case + // + // We only go through this fast path for the lock/unlock usage and avoid this + // for combined critical sections. This check adds unnecessary overhead in + // that case as it causes an extra cacheline bounce + constexpr auto combineRequested = !std::is_same<Request, std::nullptr_t>{}; + if (!combineRequested) { + if (auto state = tryLockNoLoad(atomic, mutex)) { + return state; + } + } + + auto previous = std::uintptr_t{0}; + auto waitMode = combineRequested ? kCombineUninitialized : kUninitialized; + auto nextWaitMode = kAboutToWait; + auto timedWaiter = false; + Waiter<Atomic>* nextSleeper = nullptr; + while (true) { + // construct the state needed to wait + // + // We can't use auto here because MSVC errors out due to a missing copy + // constructor + Waiter<Atomic> state{}; + auto&& task = coalesce(request, state); + auto&& storage = makeReturnValueStorageFor(task); + auto&& address = folly::bit_cast<std::uintptr_t>(&state); + attach(task, storage); + state.initialize(waitMode, std::move(task)); + assert(!(address & 0b1)); + + // set the locked bit in the address we will be persisting in the mutex + address |= kLocked; + + // attempt to acquire the mutex, mutex acquisition is successful if the + // previous value is zeroed out + // + // we use memory_order_acq_rel here because we want the read-modify-write + // operation to be both acquire and release. Acquire becasue if this is a + // successful lock acquisition, we want to acquire state any other thread + // has released from a prior unlock. We want release semantics becasue + // other threads that read the address of this value should see the full + // well-initialized node we are going to wait on if the mutex acquisition + // was unsuccessful + previous = atomic.exchange(address, std::memory_order_acq_rel); + recordTimedWaiterAndClearTimedBit(timedWaiter, previous); + state.next_.store(previous, std::memory_order_relaxed); + if (previous == kUnlocked) { + return {/* next */ nullptr, + /* expected */ address, + /* timedWaiter */ timedWaiter, + /* combined */ false, + /* waker */ 0, + /* waiters */ nullptr, + /* ready */ nextSleeper}; + } + assert(previous & kLocked); + + // wait until we get a signal from another thread, if this returns false, + // we got skipped and had probably been scheduled out, so try again + auto signal = kUninitialized; + if (!wait(&state, waitMode, nextSleeper, signal)) { + std::swap(waitMode, nextWaitMode); + continue; + } + + // at this point it is safe to access the other fields in the waiter state, + // since the thread that woke us up is gone and nobody will be touching this + // state again, note that this requires memory ordering, and this is why we + // use memory_order_acquire (among other reasons) in the above wait + // + // first we see if the value we took off the mutex state was the thread that + // initated the wakeups, if so, we are the terminal node of the current + // contention chain. If we are the terminal node, then we should expect to + // see a kLocked in the mutex state when we unlock, if we see that, we can + // commit the unlock to the centralized mutex state. If not, we need to + // continue wakeups + // + // a nice consequence of passing kLocked as the current address if we are + // the terminal node is that it naturally just works with the algorithm. If + // we get a contention chain when coming out of a contention chain, the tail + // of the new contention chain will have kLocked set as the previous, which, + // as it happens "just works", since we have now established a recursive + // relationship until broken + auto next = previous; + auto expected = address; + if (previous == state.metadata_.waker_) { + next = 0; + expected = kLocked; + } + + // if we were given a combine signal, detach the return value from the + // wait struct into the request, so the current thread can access it + // outside this function + auto combined = (signal == kCombined); + auto exceptionOccurred = (signal == kExceptionOccurred); + if (combined || exceptionOccurred) { + detach(request, state, exceptionOccurred, storage); + } + + // if we are just coming out of a futex call, then it means that the next + // waiter we are responsible for is also a waiter waiting on a futex, so + // we return that list in the list of ready threads. We wlil be waking up + // the ready threads on unlock no matter what + return {/* next */ extractPtr<Waiter<Atomic>>(next), + /* expected */ expected, + /* timedWaiter */ timedWaiter, + /* combined */ combineRequested && (combined || exceptionOccurred), + /* waker */ state.metadata_.waker_, + /* waiters */ extractPtr<Waiter<Atomic>>(state.metadata_.waiters_), + /* ready */ nextSleeper}; + } +} + +inline bool preempted(std::uint64_t value, std::chrono::nanoseconds now) { + auto currentTime = recover(strip(now)); + auto nodeTime = recover(value); + auto preempted = + (currentTime > nodeTime + kScheduledAwaySpinThreshold.count()) && + (nodeTime != recover(strip(std::chrono::nanoseconds::max()))); + + // we say that the thread has been preempted if its timestamp says so, and + // also if it is neither uninitialized nor skipped + assert(value != kSkipped); + return (preempted) && (value != kUninitialized) && + (value != kCombineUninitialized); +} + +inline bool isSleeper(std::uintptr_t value) { + return (value == kAboutToWait); +} + +inline bool isInitialized(std::uintptr_t value) { + return (value != kUninitialized) && (value != kCombineUninitialized); +} + +inline bool isCombiner(std::uintptr_t value) { + auto mode = (value & 0xff); + return (mode == kCombineWaiting) || (mode == kCombineUninitialized); +} + +inline bool isWaitingCombiner(std::uintptr_t value) { + return (value & 0xff) == kCombineWaiting; +} + +template <typename Waiter> +CombineFunction loadTask(Waiter* current, std::uintptr_t value) { + // if we know that the waiter is a combiner of some sort, it is safe to read + // and copy the value of the function in the waiter struct, since we know + // that a waiter would have set it before enqueueing + if (isCombiner(value)) { + return current->function_; + } + + return nullptr; +} + +template <typename Waiter> +void transferCurrentException(Waiter* waiter) { + assert(std::current_exception()); + new (&waiter->storage_) std::exception_ptr(std::current_exception()); + waiter->futex_.store(kExceptionOccurred, std::memory_order_release); +} + +template <template <typename> class Atomic> +inline std::uintptr_t tryCombine( + Waiter<Atomic>* waiter, + std::uintptr_t value, + std::uintptr_t next, + std::uint64_t iteration, + std::chrono::nanoseconds now, + CombineFunction task) { +#ifndef ROCKSDB_LITE + // if the waiter has asked for a combine operation, we should combine its + // critical section and move on to the next waiter + // + // the waiter is combinable if the following conditions are satisfied + // + // 1) the state in the futex word is not uninitialized (kUninitialized) + // 2) it has a valid combine function + // 3) we are not past the limit of the number of combines we can perform + // or the waiter thread been preempted. If the waiter gets preempted, + // its better to just execute their critical section before moving on. + // As they will have to re-queue themselves after preemption anyway, + // leading to further delays in critical section completion + // + // if all the above are satisfied, then we can combine the critical section. + // Note that if the waiter is in a combineable state, that means that it had + // finished its writes to both the task and the next_ value. And observing + // a waiting state also means that we have acquired the writes to the other + // members of the waiter struct, so it's fine to use those values here + if (isWaitingCombiner(value) && + (iteration <= kMaxCombineIterations || preempted(value, now))) { + try { + task(); + waiter->futex_.store(kCombined, std::memory_order_release); + } catch (...) { + transferCurrentException(waiter); + } + return next; + } +#endif // ROCKSDB_LITE + return 0; +} + +template <typename Waiter> +inline std::uintptr_t tryWake( + bool publishing, + Waiter* waiter, + std::uintptr_t value, + std::uintptr_t next, + std::uintptr_t waker, + Waiter*& sleepers, + std::uint64_t iteration, + CombineFunction task) { + // try and combine the waiter's request first, if that succeeds that means + // we have successfully executed their critical section and can move on to + // the rest of the chain + auto now = time(); + if (tryCombine(waiter, value, next, iteration, now, task)) { + return next; + } + + // first we see if we can wake the current thread that is spinning + if ((!publishing || !preempted(value, now)) && !isSleeper(value)) { + // the Metadata class should be trivially destructible as we use placement + // new to set the relevant metadata without calling any destructor. We + // need to use placement new because the class contains a futex, which is + // non-movable and non-copyable + using Metadata = _t<std::decay<decltype(waiter->metadata_)>>; + static_assert(std::is_trivially_destructible<Metadata>{}, ""); + + // we need release here because of the write to waker_ and also because we + // are unlocking the mutex, the thread we do the handoff to here should + // see the modified data + new (&waiter->metadata_) Metadata(waker, bit_cast<uintptr_t>(sleepers)); + waiter->futex_.store(kWake, std::memory_order_release); + return 0; + } + + // if the thread is not a sleeper, and we were not able to catch it before + // preemption, we can just return a false, it is safe to read next_ because + // the thread was preempted. Preemption signals can only come after the + // thread has set the next_ pointer, since the timestamp writes only start + // occurring after that point + // + // if a thread was preempted it must have stored next_ in the waiter struct, + // as the store to futex_ that resets the value from kUninitialized happens + // after the write to next + assert(publishing); + if (!isSleeper(value)) { + // go on to the next one + // + // Also, we need a memory_order_release here to prevent missed wakeups. A + // missed wakeup here can happen when we see that a thread had been + // preempted and skip it. Then go on to release the lock, and then when + // the thread which got skipped does an exchange on the central storage, + // still sees the locked bit, and never gets woken up + // + // Can we relax this? + assert(preempted(value, now)); + assert(!isCombiner(value)); + next = waiter->next_.load(std::memory_order_relaxed); + waiter->futex_.store(kSkipped, std::memory_order_release); + return next; + } + + // if we are here the thread is a sleeper + // + // we attempt to catch the thread before it goes to futex(). If we are able + // to catch the thread before it sleeps on a futex, we are done, and don't + // need to go any further + // + // if we are not able to catch the thread before it goes to futex, we + // collect the current thread in the list of sleeping threads represented by + // sleepers, and return the next thread in the list and return false along + // with the previous next value + // + // it is safe to read the next_ pointer in the waiter struct if we were + // unable to catch the thread before it went to futex() because we use + // acquire-release ordering for the exchange operation below. And if we see + // that the thread was already sleeping, we have synchronized with the write + // to next_ in the context of the sleeping thread + // + // Also we need to set the value of waiters_ and waker_ in the thread before + // doing the exchange because we need to pass on the list of sleepers in the + // event that we were able to catch the thread before it went to futex(). + // If we were unable to catch the thread before it slept, these fields will + // be ignored when the thread wakes up anyway + assert(isSleeper(value)); + waiter->metadata_.waker_ = waker; + waiter->metadata_.waiters_ = folly::bit_cast<std::uintptr_t>(sleepers); + auto pre = + waiter->metadata_.sleeper_.exchange(kSleeping, std::memory_order_acq_rel); + + // we were able to catch the thread before it went to sleep, return true + if (pre != kSleeping) { + return 0; + } + + // otherwise return false, with the value of next_, it is safe to read next + // because of the same logic as when a thread was preempted + // + // we also need to collect this sleeper in the list of sleepers being built + // up + next = waiter->next_.load(std::memory_order_relaxed); + auto head = folly::bit_cast<std::uintptr_t>(sleepers); + waiter->next_.store(head, std::memory_order_relaxed); + sleepers = waiter; + return next; +} + +template <typename Waiter> +bool wake( + bool publishing, + Waiter& waiter, + std::uintptr_t waker, + Waiter*& sleepers, + std::uint64_t iter) { + // loop till we find a node that is either at the end of the list (as + // specified by waker) or we find a node that is active (as specified by + // the last published timestamp of the node) + auto current = &waiter; + while (current) { + // it is important that we load the value of function and next_ after the + // initial acquire load. This is required because we need to synchronize + // with the construction of the waiter struct before reading from it + // + // the load from the next_ variable is an optimistic load that assumes + // that the waiting thread has probably gone to the waiting state. If the + // waiitng thread is in the waiting state (as revealed by the acquire load + // from the futex word), we will see a well formed next_ value because it + // happens-before the release store to the futex word. The atomic load from + // next_ is an optimization to avoid branching before loading and prevent + // the compiler from eliding the load altogether (and using a pointer + // dereference when needed) + auto value = current->futex_.load(std::memory_order_acquire); + auto next = current->next_.load(std::memory_order_relaxed); + auto task = loadTask(current, value); + next = + tryWake(publishing, current, value, next, waker, sleepers, iter, task); + + // if there is no next node, we have managed to wake someone up and have + // successfully migrated the lock to another thread + if (!next) { + return true; + } + + // we need to read the value of the next node in the list before skipping + // it, this is because after we skip it the node might wake up and enqueue + // itself, and thereby gain a new next node + assert(publishing); + current = (next == waker) ? nullptr : extractPtr<Waiter>(next); + } + + return false; +} + +template <typename Atomic, typename Proxy, typename Sleepers> +bool tryUnlockClean(Atomic& state, Proxy& proxy, Sleepers sleepers) { + auto expected = proxy.expected_; + while (true) { + if (state.compare_exchange_strong( + expected, + kUnlocked, + std::memory_order_release, + std::memory_order_relaxed)) { + // if we were able to commit an unlocked, we need to wake up the futex + // waiters, if any + doFutexWake(sleepers); + return true; + } + + // if we failed the compare_exchange_strong() above, we check to see if + // the failure was because of the presence of a timed waiter. If that + // was the case then we try one more time with the kTimedWaiter bit set + if (expected == (proxy.expected_ | kTimedWaiter)) { + proxy.timedWaiters_ = true; + continue; + } + + // otherwise break, we have a contention chain + return false; + } +} + +template <template <typename> class Atomic, bool Publish> +void DistributedMutex<Atomic, Publish>::unlock( + DistributedMutex::DistributedMutexStateProxy proxy) { + // we always wake up ready threads and timed waiters if we saw either + assert(proxy); + assert(!proxy.combined_); + SCOPE_EXIT { + doFutexWake(proxy.ready_); + wakeTimedWaiters(&state_, proxy.timedWaiters_); + }; + + // if there is a wait queue we are responsible for, try and start wakeups, + // don't bother with the mutex state + auto sleepers = proxy.waiters_; + if (proxy.next_) { + if (wake(Publish, *proxy.next_, proxy.waker_, sleepers, 0)) { + return; + } + + // At this point, if are in the if statement, we were not the terminal + // node of the wakeup chain. Terminal nodes have the next_ pointer set to + // null in lock() + // + // So we need to pretend we were the end of the contention chain. Coming + // out of a contention chain always has the kLocked state set in the + // mutex. Unless there is another contention chain lined up, which does + // not matter since we are the terminal node anyway + proxy.expected_ = kLocked; + } + + for (std::uint64_t i = 0; true; ++i) { + // otherwise, since we don't have anyone we need to wake up, we try and + // release the mutex just as is + // + // if this is successful, we can return, the unlock was successful, we have + // committed a nice kUnlocked to the central storage, yay + if (tryUnlockClean(state_, proxy, sleepers)) { + return; + } + + // here we have a contention chain built up on the mutex. We grab the + // wait queue and start executing wakeups. We leave a locked bit on the + // centralized storage and handoff control to the head of the queue + // + // we use memory_order_acq_rel here because we want to see the + // full well-initialized node that the other thread is waiting on + // + // If we are unable to wake the contention chain, it is possible that when + // we come back to looping here, a new contention chain will form. In + // that case we need to use kLocked as the waker_ value because the + // terminal node of the new chain will see kLocked in the central storage + auto head = state_.exchange(kLocked, std::memory_order_acq_rel); + recordTimedWaiterAndClearTimedBit(proxy.timedWaiters_, head); + auto next = extractPtr<Waiter<Atomic>>(head); + auto expected = folly::exchange(proxy.expected_, kLocked); + assert((head & kLocked) && (head != kLocked)); + if (wake(Publish, *next, expected, sleepers, i)) { + break; + } + } +} + +template <typename Atomic, typename Deadline, typename MakeProxy> +auto timedLock(Atomic& state, Deadline deadline, MakeProxy proxy) + -> decltype(std::declval<MakeProxy&>()(nullptr, kLocked, true)) { + while (true) { + // we put a bit on the central state to show that there is a timed waiter + // and go to sleep on the central state + // + // when this thread goes to unlock the mutex, it will expect a 0b1 in the + // mutex state (0b1, not 0b11), but then it will see that the value in the + // mutex state is 0b11 and not 0b1, meaning that there might have been + // another timed waiter. Even though there might not have been another + // timed waiter in the time being. This sort of missed wakeup is + // desirable for timed waiters; it helps avoid thundering herds of timed + // waiters. Because the mutex is packed in 8 bytes, and we need an + // address to be stored in those 8 bytes, we don't have much room to play + // with. The only other solution is to issue a futexWake(INT_MAX) to wake + // up all waiters when a clean unlock is committed, when a thread saw a + // timed waiter in the mutex previously. + // + // putting a 0b11 here works for a set of reasons that is a superset of + // the set of reasons that make it okay to put a kLocked (0b1) in the + // mutex state. Now that the thread has put (kTimedWaiter | kLocked) + // (0b11) in the mutex state and it expects a kLocked (0b1), there are two + // scenarios possible. The first being when there is no contention chain + // formation in the mutex from the time a timed waiter got a lock to + // unlock. In this case, the unlocker sees a 0b11 in the mutex state, + // adjusts to the presence of a timed waiter and cleanly unlocks with a + // kUnlocked (0b0). The second is when there is a contention chain. + // When a thread puts its address in the mutex and sees the timed bit, it + // records the presence of a timed waiter, and then pretends as if it + // hadn't seen the timed bit. So future contention chain releases, will + // terminate with a kLocked (0b1) and not a (kLocked | kTimedWaiter) + // (0b11). This just works naturally with the rest of the algorithm + // without incurring a perf hit for the regular non-timed case + // + // this strategy does however mean, that when threads try to acquire the + // mutex and all time out, there will be a wasteful syscall to issue wakeups + // to waiting threads. We don't do anything to try and minimize this + // + // we need to use a fetch_or() here because we need to convey two bits of + // information - 1, whether the mutex is locked or not, and 2, whether + // there is a timed waiter. The alternative here is to use the second bit + // to convey information only, we can use a fetch_set() on the second bit + // to make this faster, but that comes at the expense of requiring regular + // fast path lock attempts. Which use a single bit read-modify-write for + // better performance + auto data = kTimedWaiter | kLocked; + auto previous = state.fetch_or(data, std::memory_order_acquire); + if (!(previous & 0b1)) { + assert(!previous); + return proxy(nullptr, kLocked, true); + } + + // wait on the futex until signalled, if we get a timeout, the try_lock + // fails + auto result = atomic_wait_until(&state, previous | data, deadline); + if (result == std::cv_status::timeout) { + return proxy(nullptr, std::uintptr_t{0}, false); + } + } +} + +template <template <typename> class Atomic, bool TimePublishing> +template <typename Clock, typename Duration> +typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy +DistributedMutex<Atomic, TimePublishing>::try_lock_until( + const std::chrono::time_point<Clock, Duration>& deadline) { + // fast path for the uncontended case + // + // we get the time after trying to acquire the mutex because in the + // uncontended case, the price of getting the time is about 1/3 of the + // actual mutex acquisition. So we only pay the price of that extra bit of + // latency when needed + // + // this is even higher when VDSO is involved on architectures that do not + // offer a direct interface to the timestamp counter + if (auto state = try_lock()) { + return state; + } + + // fall back to the timed locking algorithm + using Proxy = DistributedMutexStateProxy; + return timedLock( + state_, + deadline, + [](Waiter<Atomic>* next, std::uintptr_t expected, bool timedWaiter) { + return Proxy{next, expected, timedWaiter}; + }); +} + +template <template <typename> class Atomic, bool TimePublishing> +template <typename Rep, typename Period> +typename DistributedMutex<Atomic, TimePublishing>::DistributedMutexStateProxy +DistributedMutex<Atomic, TimePublishing>::try_lock_for( + const std::chrono::duration<Rep, Period>& duration) { + // fast path for the uncontended case. Reasoning for doing this here is the + // same as in try_lock_until() + if (auto state = try_lock()) { + return state; + } + + // fall back to the timed locking algorithm + using Proxy = DistributedMutexStateProxy; + auto deadline = std::chrono::steady_clock::now() + duration; + return timedLock( + state_, + deadline, + [](Waiter<Atomic>* next, std::uintptr_t expected, bool timedWaiter) { + return Proxy{next, expected, timedWaiter}; + }); +} +} // namespace distributed_mutex +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex.cpp b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex.cpp new file mode 100644 index 000000000..28684ff29 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex.cpp @@ -0,0 +1,16 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <folly/synchronization/DistributedMutex.h> + +namespace folly { +namespace detail { +namespace distributed_mutex { + +template class DistributedMutex<std::atomic, true>; + +} // namespace distributed_mutex +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex.h b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex.h new file mode 100644 index 000000000..7acf04f45 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutex.h @@ -0,0 +1,304 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <chrono> +#include <cstdint> + +namespace folly { +namespace detail { +namespace distributed_mutex { + +/** + * DistributedMutex is a small, exclusive-only mutex that distributes the + * bookkeeping required for mutual exclusion in the stacks of threads that are + * contending for it. It has a mode that can combine critical sections when + * the mutex experiences contention; this allows the implementation to elide + * several expensive coherence and synchronization operations to boost + * throughput, surpassing even atomic instructions in some cases. It has a + * smaller memory footprint than std::mutex, a similar level of fairness + * (better in some cases) and no dependencies on heap allocation. It is the + * same width as a single pointer (8 bytes on most platforms), where on the + * other hand, std::mutex and pthread_mutex_t are both 40 bytes. It is larger + * than some of the other smaller locks, but the wide majority of cases using + * the small locks are wasting the difference in alignment padding anyway + * + * Benchmark results are good - at the time of writing, in the contended case, + * for lock/unlock based critical sections, it is about 4-5x faster than the + * smaller locks and about ~2x faster than std::mutex. When used in + * combinable mode, it is much faster than the alternatives, going more than + * 10x faster than the small locks, about 6x faster than std::mutex, 2-3x + * faster than flat combining and even faster than std::atomic<> in some + * cases, allowing more work with higher throughput. In the uncontended case, + * it is a few cycles faster than folly::MicroLock but a bit slower than + * std::mutex. DistributedMutex is also resistent to tail latency pathalogies + * unlike many of the other mutexes in use, which sleep for large time + * quantums to reduce spin churn, this causes elevated latencies for threads + * that enter the sleep cycle. The tail latency of lock acquisition can go up + * to 10x lower because of a more deterministic scheduling algorithm that is + * managed almost entirely in userspace. Detailed results comparing the + * throughput and latencies of different mutex implementations and atomics are + * at the bottom of folly/synchronization/test/SmallLocksBenchmark.cpp + * + * Theoretically, write locks promote concurrency when the critical sections + * are small as most of the work is done outside the lock. And indeed, + * performant concurrent applications go through several pains to limit the + * amount of work they do while holding a lock. However, most times, the + * synchronization and scheduling overhead of a write lock in the critical + * path is so high, that after a certain point, making critical sections + * smaller does not actually increase the concurrency of the application and + * throughput plateaus. DistributedMutex moves this breaking point to the + * level of hardware atomic instructions, so applications keep getting + * concurrency even under very high contention. It does this by reducing + * cache misses and contention in userspace and in the kernel by making each + * thread wait on a thread local node and futex. When combined critical + * sections are used DistributedMutex leverages template metaprogramming to + * allow the mutex to make better synchronization decisions based on the + * layout of the input and output data. This allows threads to keep working + * only on their own cache lines without requiring cache coherence operations + * when a mutex experiences heavy contention + * + * Non-timed mutex acquisitions are scheduled through intrusive LIFO + * contention chains. Each thread starts by spinning for a short quantum and + * falls back to two phased sleeping. Enqueue operations are lock free and + * are piggybacked off mutex acquisition attempts. The LIFO behavior of a + * contention chain is good in the case where the mutex is held for a short + * amount of time, as the head of the chain is likely to not have slept on + * futex() after exhausting its spin quantum. This allow us to avoid + * unnecessary traversal and syscalls in the fast path with a higher + * probability. Even though the contention chains are LIFO, the mutex itself + * does not adhere to that scheduling policy globally. During contention, + * threads that fail to lock the mutex form a LIFO chain on the central mutex + * state, this chain is broken when a wakeup is scheduled, and future enqueue + * operations form a new chain. This makes the chains themselves LIFO, but + * preserves global fairness through a constant factor which is limited to the + * number of concurrent failed mutex acquisition attempts. This binds the + * last in first out behavior to the number of contending threads and helps + * prevent starvation and latency outliers + * + * This strategy of waking up wakers one by one in a queue does not scale well + * when the number of threads goes past the number of cores. At which point + * preemption causes elevated lock acquisition latencies. DistributedMutex + * implements a hardware timestamp publishing heuristic to detect and adapt to + * preemption. + * + * DistributedMutex does not have the typical mutex API - it does not satisfy + * the Lockable concept. It requires the user to maintain ephemeral bookkeeping + * and pass that bookkeeping around to unlock() calls. The API overhead, + * however, comes for free when you wrap this mutex for usage with + * std::unique_lock, which is the recommended usage (std::lock_guard, in + * optimized mode, has no performance benefit over std::unique_lock, so has been + * omitted). A benefit of this API is that it disallows incorrect usage where a + * thread unlocks a mutex that it does not own, thinking a mutex is functionally + * identical to a binary semaphore, which, unlike a mutex, is a suitable + * primitive for that usage + * + * Combined critical sections allow the implementation to elide several + * expensive operations during the lifetime of a critical section that cause + * slowdowns with regular lock/unlock based usage. DistributedMutex resolves + * contention through combining up to a constant factor of 2 contention chains + * to prevent issues with fairness and latency outliers, so we retain the + * fairness benefits of the lock/unlock implementation with no noticeable + * regression when switching between the lock methods. Despite the efficiency + * benefits, combined critical sections can only be used when the critical + * section does not depend on thread local state and does not introduce new + * dependencies between threads when the critical section gets combined. For + * example, locking or unlocking an unrelated mutex in a combined critical + * section might lead to unexpected results or even undefined behavior. This + * can happen if, for example, a different thread unlocks a mutex locked by + * the calling thread, leading to undefined behavior as the mutex might not + * allow locking and unlocking from unrelated threads (the posix and C++ + * standard disallow this usage for their mutexes) + * + * Timed locking through DistributedMutex is implemented through a centralized + * algorithm. The underlying contention-chains framework used in + * DistributedMutex is not abortable so we build abortability on the side. + * All waiters wait on the central mutex state, by setting and resetting bits + * within the pointer-length word. Since pointer length atomic integers are + * incompatible with futex(FUTEX_WAIT) on most systems, a non-standard + * implementation of futex() is used, where wait queues are managed in + * user-space (see p1135r0 and folly::ParkingLot for more) + */ +template < + template <typename> class Atomic = std::atomic, + bool TimePublishing = true> +class DistributedMutex { + public: + class DistributedMutexStateProxy; + + /** + * DistributedMutex is only default constructible, it can neither be moved + * nor copied + */ + DistributedMutex(); + DistributedMutex(DistributedMutex&&) = delete; + DistributedMutex(const DistributedMutex&) = delete; + DistributedMutex& operator=(DistributedMutex&&) = delete; + DistributedMutex& operator=(const DistributedMutex&) = delete; + + /** + * Acquires the mutex in exclusive mode + * + * This returns an ephemeral proxy that contains internal mutex state. This + * must be kept around for the duration of the critical section and passed + * subsequently to unlock() as an rvalue + * + * The proxy has no public API and is intended to be for internal usage only + * + * There are three notable cases where this method causes undefined + * behavior: + * + * - This is not a recursive mutex. Trying to acquire the mutex twice from + * the same thread without unlocking it results in undefined behavior + * - Thread, coroutine or fiber migrations from within a critical section + * are disallowed. This is because the implementation requires owning the + * stack frame through the execution of the critical section for both + * lock/unlock or combined critical sections. This also means that you + * cannot allow another thread, fiber or coroutine to unlock the mutex + * - This mutex cannot be used in a program compiled with segmented stacks, + * there is currently no way to detect the presence of segmented stacks + * at compile time or runtime, so we have no checks against this + */ + DistributedMutexStateProxy lock(); + + /** + * Unlocks the mutex + * + * The proxy returned by lock must be passed to unlock as an rvalue. No + * other option is possible here, since the proxy is only movable and not + * copyable + * + * It is undefined behavior to unlock from a thread that did not lock the + * mutex + */ + void unlock(DistributedMutexStateProxy); + + /** + * Try to acquire the mutex + * + * A non blocking version of the lock() function. The returned object is + * contextually convertible to bool. And has the value true when the mutex + * was successfully acquired, false otherwise + * + * This is allowed to return false spuriously, i.e. this is not guaranteed + * to return true even when the mutex is currently unlocked. In the event + * of a failed acquisition, this does not impose any memory ordering + * constraints for other threads + */ + DistributedMutexStateProxy try_lock(); + + /** + * Try to acquire the mutex, blocking for the given time + * + * Like try_lock(), this is allowed to fail spuriously and is not guaranteed + * to return false even when the mutex is currently unlocked. But only + * after the given time has elapsed + * + * try_lock_for() accepts a duration to block for, and try_lock_until() + * accepts an absolute wall clock time point + */ + template <typename Rep, typename Period> + DistributedMutexStateProxy try_lock_for( + const std::chrono::duration<Rep, Period>& duration); + + /** + * Try to acquire the lock, blocking until the given deadline + * + * Other than the difference in the meaning of the second argument, the + * semantics of this function are identical to try_lock_for() + */ + template <typename Clock, typename Duration> + DistributedMutexStateProxy try_lock_until( + const std::chrono::time_point<Clock, Duration>& deadline); + + /** + * Execute a task as a combined critical section + * + * Unlike traditional lock and unlock methods, lock_combine() enqueues the + * passed task for execution on any arbitrary thread. This allows the + * implementation to prevent cache line invalidations originating from + * expensive synchronization operations. The thread holding the lock is + * allowed to execute the task before unlocking, thereby forming a "combined + * critical section". + * + * This idea is inspired by Flat Combining. Flat Combining was introduced + * in the SPAA 2010 paper titled "Flat Combining and the + * Synchronization-Parallelism Tradeoff", by Danny Hendler, Itai Incze, Nir + * Shavit, and Moran Tzafrir - + * https://www.cs.bgu.ac.il/~hendlerd/papers/flat-combining.pdf. The + * implementation used here is significantly different from that described + * in the paper. The high-level goal of reducing the overhead of + * synchronization, however, is the same. + * + * Combined critical sections work best when kept simple. Since the + * critical section might be executed on any arbitrary thread, relying on + * things like thread local state or mutex locking and unlocking might cause + * incorrectness. Associativity is important. For example + * + * auto one = std::unique_lock{one_}; + * two_.lock_combine([&]() { + * if (bar()) { + * one.unlock(); + * } + * }); + * + * This has the potential to cause undefined behavior because mutexes are + * only meant to be acquired and released from the owning thread. Similar + * errors can arise from a combined critical section introducing implicit + * dependencies based on the state of the combining thread. For example + * + * // thread 1 + * auto one = std::unique_lock{one_}; + * auto two = std::unique_lock{two_}; + * + * // thread 2 + * two_.lock_combine([&]() { + * auto three = std::unique_lock{three_}; + * }); + * + * Here, because we used a combined critical section, we have introduced a + * dependency from one -> three that might not obvious to the reader + * + * This function is exception-safe. If the passed task throws an exception, + * it will be propagated to the caller, even if the task is running on + * another thread + * + * There are three notable cases where this method causes undefined + * behavior: + * + * - This is not a recursive mutex. Trying to acquire the mutex twice from + * the same thread without unlocking it results in undefined behavior + * - Thread, coroutine or fiber migrations from within a critical section + * are disallowed. This is because the implementation requires owning the + * stack frame through the execution of the critical section for both + * lock/unlock or combined critical sections. This also means that you + * cannot allow another thread, fiber or coroutine to unlock the mutex + * - This mutex cannot be used in a program compiled with segmented stacks, + * there is currently no way to detect the presence of segmented stacks + * at compile time or runtime, so we have no checks against this + */ + template <typename Task> + auto lock_combine(Task task) -> decltype(std::declval<const Task&>()()); + + private: + Atomic<std::uintptr_t> state_{0}; +}; + +} // namespace distributed_mutex +} // namespace detail + +/** + * Bring the default instantiation of DistributedMutex into the folly + * namespace without requiring any template arguments for public usage + */ +extern template class detail::distributed_mutex::DistributedMutex<>; +using DistributedMutex = detail::distributed_mutex::DistributedMutex<>; + +} // namespace folly + +#include <folly/synchronization/DistributedMutex-inl.h> +#include <folly/synchronization/DistributedMutexSpecializations.h> diff --git a/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutexSpecializations.h b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutexSpecializations.h new file mode 100644 index 000000000..451aa69bc --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/DistributedMutexSpecializations.h @@ -0,0 +1,39 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/synchronization/DistributedMutex.h> +#include <folly/synchronization/detail/ProxyLockable.h> + +/** + * Specializations for DistributedMutex allow us to use it like a normal + * mutex. Even though it has a non-usual interface + */ +namespace std { +template <template <typename> class Atom, bool TimePublishing> +class unique_lock< + ::folly::detail::distributed_mutex::DistributedMutex<Atom, TimePublishing>> + : public ::folly::detail::ProxyLockableUniqueLock< + ::folly::detail::distributed_mutex:: + DistributedMutex<Atom, TimePublishing>> { + public: + using ::folly::detail::ProxyLockableUniqueLock< + ::folly::detail::distributed_mutex:: + DistributedMutex<Atom, TimePublishing>>::ProxyLockableUniqueLock; +}; + +template <template <typename> class Atom, bool TimePublishing> +class lock_guard< + ::folly::detail::distributed_mutex::DistributedMutex<Atom, TimePublishing>> + : public ::folly::detail::ProxyLockableLockGuard< + ::folly::detail::distributed_mutex:: + DistributedMutex<Atom, TimePublishing>> { + public: + using ::folly::detail::ProxyLockableLockGuard< + ::folly::detail::distributed_mutex:: + DistributedMutex<Atom, TimePublishing>>::ProxyLockableLockGuard; +}; +} // namespace std diff --git a/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.cpp b/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.cpp new file mode 100644 index 000000000..74fba8e93 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.cpp @@ -0,0 +1,26 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <folly/synchronization/ParkingLot.h> + +#include <array> + +namespace folly { +namespace parking_lot_detail { + +Bucket& Bucket::bucketFor(uint64_t key) { + constexpr size_t const kNumBuckets = 4096; + + // Statically allocating this lets us use this in allocation-sensitive + // contexts. This relies on the assumption that std::mutex won't dynamically + // allocate memory, which we assume to be the case on Linux and iOS. + static Indestructible<std::array<Bucket, kNumBuckets>> gBuckets; + return (*gBuckets)[key % kNumBuckets]; +} + +std::atomic<uint64_t> idallocator{0}; + +} // namespace parking_lot_detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h b/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h new file mode 100644 index 000000000..bb324fb0a --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/ParkingLot.h @@ -0,0 +1,318 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <atomic> +#include <condition_variable> +#include <mutex> + +#include <folly/hash/Hash.h> +#include <folly/Indestructible.h> +#include <folly/Unit.h> + +namespace folly { + +namespace parking_lot_detail { + +struct WaitNodeBase { + const uint64_t key_; + const uint64_t lotid_; + WaitNodeBase* next_{nullptr}; + WaitNodeBase* prev_{nullptr}; + + // tricky: hold both bucket and node mutex to write, either to read + bool signaled_; + std::mutex mutex_; + std::condition_variable cond_; + + WaitNodeBase(uint64_t key, uint64_t lotid) + : key_(key), lotid_(lotid), signaled_(false) {} + + template <typename Clock, typename Duration> + std::cv_status wait(std::chrono::time_point<Clock, Duration> deadline) { + std::cv_status status = std::cv_status::no_timeout; + std::unique_lock<std::mutex> nodeLock(mutex_); + while (!signaled_ && status != std::cv_status::timeout) { + if (deadline != std::chrono::time_point<Clock, Duration>::max()) { + status = cond_.wait_until(nodeLock, deadline); + } else { + cond_.wait(nodeLock); + } + } + return status; + } + + void wake() { + std::lock_guard<std::mutex> nodeLock(mutex_); + signaled_ = true; + cond_.notify_one(); + } + + bool signaled() { + return signaled_; + } +}; + +extern std::atomic<uint64_t> idallocator; + +// Our emulated futex uses 4096 lists of wait nodes. There are two levels +// of locking: a per-list mutex that controls access to the list and a +// per-node mutex, condvar, and bool that are used for the actual wakeups. +// The per-node mutex allows us to do precise wakeups without thundering +// herds. +struct Bucket { + std::mutex mutex_; + WaitNodeBase* head_; + WaitNodeBase* tail_; + std::atomic<uint64_t> count_; + + static Bucket& bucketFor(uint64_t key); + + void push_back(WaitNodeBase* node) { + if (tail_) { + assert(head_); + node->prev_ = tail_; + tail_->next_ = node; + tail_ = node; + } else { + tail_ = node; + head_ = node; + } + } + + void erase(WaitNodeBase* node) { + assert(count_.load(std::memory_order_relaxed) >= 1); + if (head_ == node && tail_ == node) { + assert(node->prev_ == nullptr); + assert(node->next_ == nullptr); + head_ = nullptr; + tail_ = nullptr; + } else if (head_ == node) { + assert(node->prev_ == nullptr); + assert(node->next_); + head_ = node->next_; + head_->prev_ = nullptr; + } else if (tail_ == node) { + assert(node->next_ == nullptr); + assert(node->prev_); + tail_ = node->prev_; + tail_->next_ = nullptr; + } else { + assert(node->next_); + assert(node->prev_); + node->next_->prev_ = node->prev_; + node->prev_->next_ = node->next_; + } + count_.fetch_sub(1, std::memory_order_relaxed); + } +}; + +} // namespace parking_lot_detail + +enum class UnparkControl { + RetainContinue, + RemoveContinue, + RetainBreak, + RemoveBreak, +}; + +enum class ParkResult { + Skip, + Unpark, + Timeout, +}; + +/* + * ParkingLot provides an interface that is similar to Linux's futex + * system call, but with additional functionality. It is implemented + * in a portable way on top of std::mutex and std::condition_variable. + * + * Additional reading: + * https://webkit.org/blog/6161/locking-in-webkit/ + * https://github.com/WebKit/webkit/blob/master/Source/WTF/wtf/ParkingLot.h + * https://locklessinc.com/articles/futex_cheat_sheet/ + * + * The main difference from futex is that park/unpark take lambdas, + * such that nearly anything can be done while holding the bucket + * lock. Unpark() lambda can also be used to wake up any number of + * waiters. + * + * ParkingLot is templated on the data type, however, all ParkingLot + * implementations are backed by a single static array of buckets to + * avoid large memory overhead. Lambdas will only ever be called on + * the specific ParkingLot's nodes. + */ +template <typename Data = Unit> +class ParkingLot { + const uint64_t lotid_; + ParkingLot(const ParkingLot&) = delete; + + struct WaitNode : public parking_lot_detail::WaitNodeBase { + const Data data_; + + template <typename D> + WaitNode(uint64_t key, uint64_t lotid, D&& data) + : WaitNodeBase(key, lotid), data_(std::forward<D>(data)) {} + }; + + public: + ParkingLot() : lotid_(parking_lot_detail::idallocator++) {} + + /* Park API + * + * Key is almost always the address of a variable. + * + * ToPark runs while holding the bucket lock: usually this + * is a check to see if we can sleep, by checking waiter bits. + * + * PreWait is usually used to implement condition variable like + * things, such that you can unlock the condition variable's lock at + * the appropriate time. + */ + template <typename Key, typename D, typename ToPark, typename PreWait> + ParkResult park(const Key key, D&& data, ToPark&& toPark, PreWait&& preWait) { + return park_until( + key, + std::forward<D>(data), + std::forward<ToPark>(toPark), + std::forward<PreWait>(preWait), + std::chrono::steady_clock::time_point::max()); + } + + template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Clock, + typename Duration> + ParkResult park_until( + const Key key, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::time_point<Clock, Duration> deadline); + + template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Rep, + typename Period> + ParkResult park_for( + const Key key, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::duration<Rep, Period>& timeout) { + return park_until( + key, + std::forward<D>(data), + std::forward<ToPark>(toPark), + std::forward<PreWait>(preWait), + timeout + std::chrono::steady_clock::now()); + } + + /* + * Unpark API + * + * Key is the same uniqueaddress used in park(), and is used as a + * hash key for lookup of waiters. + * + * Unparker is a function that is given the Data parameter, and + * returns an UnparkControl. The Remove* results will remove and + * wake the waiter, the Ignore/Stop results will not, while stopping + * or continuing iteration of the waiter list. + */ + template <typename Key, typename Unparker> + void unpark(const Key key, Unparker&& func); +}; + +template <typename Data> +template < + typename Key, + typename D, + typename ToPark, + typename PreWait, + typename Clock, + typename Duration> +ParkResult ParkingLot<Data>::park_until( + const Key bits, + D&& data, + ToPark&& toPark, + PreWait&& preWait, + std::chrono::time_point<Clock, Duration> deadline) { + auto key = hash::twang_mix64(uint64_t(bits)); + auto& bucket = parking_lot_detail::Bucket::bucketFor(key); + WaitNode node(key, lotid_, std::forward<D>(data)); + + { + // A: Must be seq_cst. Matches B. + bucket.count_.fetch_add(1, std::memory_order_seq_cst); + + std::unique_lock<std::mutex> bucketLock(bucket.mutex_); + + if (!std::forward<ToPark>(toPark)()) { + bucketLock.unlock(); + bucket.count_.fetch_sub(1, std::memory_order_relaxed); + return ParkResult::Skip; + } + + bucket.push_back(&node); + } // bucketLock scope + + std::forward<PreWait>(preWait)(); + + auto status = node.wait(deadline); + + if (status == std::cv_status::timeout) { + // it's not really a timeout until we unlink the unsignaled node + std::lock_guard<std::mutex> bucketLock(bucket.mutex_); + if (!node.signaled()) { + bucket.erase(&node); + return ParkResult::Timeout; + } + } + + return ParkResult::Unpark; +} + +template <typename Data> +template <typename Key, typename Func> +void ParkingLot<Data>::unpark(const Key bits, Func&& func) { + auto key = hash::twang_mix64(uint64_t(bits)); + auto& bucket = parking_lot_detail::Bucket::bucketFor(key); + // B: Must be seq_cst. Matches A. If true, A *must* see in seq_cst + // order any atomic updates in toPark() (and matching updates that + // happen before unpark is called) + if (bucket.count_.load(std::memory_order_seq_cst) == 0) { + return; + } + + std::lock_guard<std::mutex> bucketLock(bucket.mutex_); + + for (auto iter = bucket.head_; iter != nullptr;) { + auto node = static_cast<WaitNode*>(iter); + iter = iter->next_; + if (node->key_ == key && node->lotid_ == lotid_) { + auto result = std::forward<Func>(func)(node->data_); + if (result == UnparkControl::RemoveBreak || + result == UnparkControl::RemoveContinue) { + // we unlink, but waiter destroys the node + bucket.erase(node); + + node->wake(); + } + if (result == UnparkControl::RemoveBreak || + result == UnparkControl::RetainBreak) { + return; + } + } + } +} + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/WaitOptions.cpp b/src/rocksdb/third-party/folly/folly/synchronization/WaitOptions.cpp new file mode 100644 index 000000000..0c1fe2b93 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/WaitOptions.cpp @@ -0,0 +1,12 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <folly/synchronization/WaitOptions.h> + +namespace folly { + +constexpr std::chrono::nanoseconds WaitOptions::Defaults::spin_max; + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/WaitOptions.h b/src/rocksdb/third-party/folly/folly/synchronization/WaitOptions.h new file mode 100644 index 000000000..b28deb54d --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/WaitOptions.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <chrono> + +namespace folly { + +/// WaitOptions +/// +/// Various synchronization primitives as well as various concurrent data +/// structures built using them have operations which might wait. This type +/// represents a set of options for controlling such waiting. +class WaitOptions { + public: + struct Defaults { + /// spin_max + /// + /// If multiple threads are actively using a synchronization primitive, + /// whether indirectly via a higher-level concurrent data structure or + /// directly, where the synchronization primitive has an operation which + /// waits and another operation which wakes the waiter, it is common for + /// wait and wake events to happen almost at the same time. In this state, + /// we lose big 50% of the time if the wait blocks immediately. + /// + /// We can improve our chances of being waked immediately, before blocking, + /// by spinning for a short duration, although we have to balance this + /// against the extra cpu utilization, latency reduction, power consumption, + /// and priority inversion effect if we end up blocking anyway. + /// + /// We use a default maximum of 2 usec of spinning. As partial consolation, + /// since spinning as implemented in folly uses the pause instruction where + /// available, we give a small speed boost to the colocated hyperthread. + /// + /// On circa-2013 devbox hardware, it costs about 7 usec to FUTEX_WAIT and + /// then be awoken. Spins on this hw take about 7 nsec, where all but 0.5 + /// nsec is the pause instruction. + static constexpr std::chrono::nanoseconds spin_max = + std::chrono::microseconds(2); + }; + + std::chrono::nanoseconds spin_max() const { + return spin_max_; + } + WaitOptions& spin_max(std::chrono::nanoseconds dur) { + spin_max_ = dur; + return *this; + } + + private: + std::chrono::nanoseconds spin_max_ = Defaults::spin_max; +}; + +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/detail/InlineFunctionRef.h b/src/rocksdb/third-party/folly/folly/synchronization/detail/InlineFunctionRef.h new file mode 100644 index 000000000..6782c792e --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/detail/InlineFunctionRef.h @@ -0,0 +1,219 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <cstdint> +#include <type_traits> + +#include <folly/Traits.h> +#include <folly/Utility.h> +#include <folly/functional/Invoke.h> +#include <folly/lang/Launder.h> + +namespace folly { +namespace detail { + +/** + * InlineFunctionRef is similar to folly::FunctionRef but has the additional + * benefit of being able to store the function it was instantiated with inline + * in a buffer of the given capacity. Inline storage is only used if the + * function object and a pointer (for type-erasure) are small enough to fit in + * the templated size. If there is not enough in-situ capacity for the + * callable, this just stores a reference to the function object like + * FunctionRef. + * + * This helps give a perf boost in the case where the data gets separated from + * the point of invocation. If, for example, at the point of invocation, the + * InlineFunctionRef object is not cached, a remote memory/cache read might be + * required to invoke the original callable. Customizable inline storage + * helps tune storage so we can store a type-erased callable with better + * performance and locality. A real-life example of this might be a + * folly::FunctionRef with a function pointer. The folly::FunctionRef would + * point to the function pointer object in a remote location. This causes a + * double-indirection at the point of invocation, and if that memory is dirty, + * or not cached, it would cause additional cache misses. On the other hand + * with InlineFunctionRef, inline storage would store the value of the + * function pointer, avoiding the need to do a remote lookup to fetch the + * value of the function pointer. + * + * To prevent misuse, InlineFunctionRef disallows construction from an lvalue + * callable. This is to prevent usage where a user relies on the callable's + * state after invocation through InlineFunctionRef. This has the potential + * to copy the callable into inline storage when the callable is small, so we + * might not use the same function when invoking, but rather a copy of it. + * + * Also note that InlineFunctionRef will always invoke the const qualified + * version of the call operator for any callable that is passed. Regardless + * of whether it has a non-const version. This is done to enforce the logical + * constraint of function state being immutable. + * + * This class is always trivially-copyable (and therefore + * trivially-destructible), making it suitable for use in a union without + * requiring manual destruction. + */ +template <typename FunctionType, std::size_t Size> +class InlineFunctionRef; + +template <typename ReturnType, typename... Args, std::size_t Size> +class InlineFunctionRef<ReturnType(Args...), Size> { + using Storage = + _t<std::aligned_storage<Size - sizeof(uintptr_t), sizeof(uintptr_t)>>; + using Call = ReturnType (*)(const Storage&, Args&&...); + + struct InSituTag {}; + struct RefTag {}; + + static_assert( + (Size % sizeof(uintptr_t)) == 0, + "Size has to be a multiple of sizeof(uintptr_t)"); + static_assert(Size >= 2 * sizeof(uintptr_t), "This doesn't work"); + static_assert(alignof(Call) == alignof(Storage), "Mismatching alignments"); + + // This defines a mode tag that is used in the construction of + // InlineFunctionRef to determine the storage and indirection method for the + // passed callable. + // + // This requires that the we pass in a type that is not ref-qualified. + template <typename Func> + using ConstructMode = _t<std::conditional< + folly::is_trivially_copyable<Func>{} && + (sizeof(Func) <= sizeof(Storage)) && + (alignof(Func) <= alignof(Storage)), + InSituTag, + RefTag>>; + + public: + /** + * InlineFunctionRef can be constructed from a nullptr, callable or another + * InlineFunctionRef with the same size. These are the constructors that + * don't take a callable. + * + * InlineFunctionRef is meant to be trivially copyable so we default the + * constructors and assignment operators. + */ + InlineFunctionRef(std::nullptr_t) : call_{nullptr} {} + InlineFunctionRef() : call_{nullptr} {} + InlineFunctionRef(const InlineFunctionRef& other) = default; + InlineFunctionRef(InlineFunctionRef&&) = default; + InlineFunctionRef& operator=(const InlineFunctionRef&) = default; + InlineFunctionRef& operator=(InlineFunctionRef&&) = default; + + /** + * Constructors from callables. + * + * If all of the following conditions are satisfied, then we store the + * callable in the inline storage: + * + * 1) The function has been passed as an rvalue, meaning that there is no + * use of the original in the user's code after it has been passed to + * us. + * 2) Size of the callable is less than the size of the inline storage + * buffer. + * 3) The callable is trivially constructible and destructible. + * + * If any one of the above conditions is not satisfied, we fall back to + * reference semantics and store the function as a pointer, and add a level + * of indirection through type erasure. + */ + template < + typename Func, + _t<std::enable_if< + !std::is_same<_t<std::decay<Func>>, InlineFunctionRef>{} && + !std::is_reference<Func>{} && + std::is_convertible< + decltype(std::declval<Func&&>()(std::declval<Args&&>()...)), + ReturnType>{}>>* = nullptr> + InlineFunctionRef(Func&& func) { + // We disallow construction from lvalues, so assert that this is not a + // reference type. When invoked with an lvalue, Func is a lvalue + // reference type, when invoked with an rvalue, Func is not ref-qualified. + static_assert( + !std::is_reference<Func>{}, + "InlineFunctionRef cannot be used with lvalues"); + static_assert(std::is_rvalue_reference<Func&&>{}, ""); + construct(ConstructMode<Func>{}, folly::as_const(func)); + } + + /** + * The call operator uses the function pointer and a reference to the + * storage to do the dispatch. The function pointer takes care of the + * appropriate casting. + */ + ReturnType operator()(Args... args) const { + return call_(storage_, static_cast<Args&&>(args)...); + } + + /** + * We have a function engaged if the call function points to anything other + * than null. + */ + operator bool() const noexcept { + return call_; + } + + private: + friend class InlineFunctionRefTest; + + /** + * Inline storage constructor implementation. + */ + template <typename Func> + void construct(InSituTag, Func& func) { + using Value = _t<std::remove_reference<Func>>; + + // Assert that the following two assumptions are valid + // 1) fit in the storage space we have and match alignments, and + // 2) be invocable in a const context, it does not make sense to copy a + // callable into inline storage if it makes state local + // modifications. + static_assert(alignof(Value) <= alignof(Storage), ""); + static_assert(is_invocable<const _t<std::decay<Func>>, Args&&...>{}, ""); + static_assert(folly::is_trivially_copyable<Value>{}, ""); + + new (&storage_) Value{func}; + call_ = &callInline<Value>; + } + + /** + * Ref storage constructor implementation. This is identical to + * folly::FunctionRef. + */ + template <typename Func> + void construct(RefTag, Func& func) { + // store a pointer to the function + using Pointer = _t<std::add_pointer<_t<std::remove_reference<Func>>>>; + new (&storage_) Pointer{&func}; + call_ = &callPointer<Pointer>; + } + + template <typename Func> + static ReturnType callInline(const Storage& object, Args&&... args) { + // The only type of pointer allowed is a function pointer, no other + // pointer types are invocable. + static_assert( + !std::is_pointer<Func>::value || + std::is_function<_t<std::remove_pointer<Func>>>::value, + ""); + return (*folly::launder(reinterpret_cast<const Func*>(&object)))( + static_cast<Args&&>(args)...); + } + + template <typename Func> + static ReturnType callPointer(const Storage& object, Args&&... args) { + // When the function we were instantiated with was not trivial, the given + // pointer points to a pointer, which pointers to the callable. So we + // cast to a pointer and then to the pointee. + static_assert(std::is_pointer<Func>::value, ""); + return (**folly::launder(reinterpret_cast<const Func*>(&object)))( + static_cast<Args&&>(args)...); + } + + Call call_; + Storage storage_; +}; + +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/detail/ProxyLockable-inl.h b/src/rocksdb/third-party/folly/folly/synchronization/detail/ProxyLockable-inl.h new file mode 100644 index 000000000..573330ceb --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/detail/ProxyLockable-inl.h @@ -0,0 +1,207 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Optional.h> +#include <folly/Portability.h> +#include <folly/Utility.h> + +#include <cassert> +#include <memory> +#include <mutex> +#include <stdexcept> +#include <utility> + +namespace folly { +namespace detail { +namespace proxylockable_detail { +template <typename Bool> +void throwIfAlreadyLocked(Bool&& locked) { + if (kIsDebug && locked) { + throw std::system_error{ + std::make_error_code(std::errc::resource_deadlock_would_occur)}; + } +} + +template <typename Bool> +void throwIfNotLocked(Bool&& locked) { + if (kIsDebug && !locked) { + throw std::system_error{ + std::make_error_code(std::errc::operation_not_permitted)}; + } +} + +template <typename Bool> +void throwIfNoMutex(Bool&& mutex) { + if (kIsDebug && !mutex) { + throw std::system_error{ + std::make_error_code(std::errc::operation_not_permitted)}; + } +} +} // namespace proxylockable_detail + +template <typename Mutex> +ProxyLockableUniqueLock<Mutex>::~ProxyLockableUniqueLock() { + if (owns_lock()) { + unlock(); + } +} + +template <typename Mutex> +ProxyLockableUniqueLock<Mutex>::ProxyLockableUniqueLock( + mutex_type& mtx) noexcept { + proxy_.emplace(mtx.lock()); + mutex_ = std::addressof(mtx); +} + +template <typename Mutex> +ProxyLockableUniqueLock<Mutex>::ProxyLockableUniqueLock( + ProxyLockableUniqueLock&& a) noexcept { + *this = std::move(a); +} + +template <typename Mutex> +ProxyLockableUniqueLock<Mutex>& ProxyLockableUniqueLock<Mutex>::operator=( + ProxyLockableUniqueLock&& other) noexcept { + proxy_ = std::move(other.proxy_); + mutex_ = folly::exchange(other.mutex_, nullptr); + return *this; +} + +template <typename Mutex> +ProxyLockableUniqueLock<Mutex>::ProxyLockableUniqueLock( + mutex_type& mtx, + std::defer_lock_t) noexcept { + mutex_ = std::addressof(mtx); +} + +template <typename Mutex> +ProxyLockableUniqueLock<Mutex>::ProxyLockableUniqueLock( + mutex_type& mtx, + std::try_to_lock_t) { + mutex_ = std::addressof(mtx); + if (auto state = mtx.try_lock()) { + proxy_.emplace(std::move(state)); + } +} + +template <typename Mutex> +template <typename Rep, typename Period> +ProxyLockableUniqueLock<Mutex>::ProxyLockableUniqueLock( + mutex_type& mtx, + const std::chrono::duration<Rep, Period>& duration) { + mutex_ = std::addressof(mtx); + if (auto state = mtx.try_lock_for(duration)) { + proxy_.emplace(std::move(state)); + } +} + +template <typename Mutex> +template <typename Clock, typename Duration> +ProxyLockableUniqueLock<Mutex>::ProxyLockableUniqueLock( + mutex_type& mtx, + const std::chrono::time_point<Clock, Duration>& time) { + mutex_ = std::addressof(mtx); + if (auto state = mtx.try_lock_until(time)) { + proxy_.emplace(std::move(state)); + } +} + +template <typename Mutex> +void ProxyLockableUniqueLock<Mutex>::lock() { + proxylockable_detail::throwIfAlreadyLocked(proxy_); + proxylockable_detail::throwIfNoMutex(mutex_); + + proxy_.emplace(mutex_->lock()); +} + +template <typename Mutex> +void ProxyLockableUniqueLock<Mutex>::unlock() { + proxylockable_detail::throwIfNoMutex(mutex_); + proxylockable_detail::throwIfNotLocked(proxy_); + + mutex_->unlock(std::move(*proxy_)); + proxy_.reset(); +} + +template <typename Mutex> +bool ProxyLockableUniqueLock<Mutex>::try_lock() { + proxylockable_detail::throwIfNoMutex(mutex_); + proxylockable_detail::throwIfAlreadyLocked(proxy_); + + if (auto state = mutex_->try_lock()) { + proxy_.emplace(std::move(state)); + return true; + } + + return false; +} + +template <typename Mutex> +template <typename Rep, typename Period> +bool ProxyLockableUniqueLock<Mutex>::try_lock_for( + const std::chrono::duration<Rep, Period>& duration) { + proxylockable_detail::throwIfNoMutex(mutex_); + proxylockable_detail::throwIfAlreadyLocked(proxy_); + + if (auto state = mutex_->try_lock_for(duration)) { + proxy_.emplace(std::move(state)); + return true; + } + + return false; +} + +template <typename Mutex> +template <typename Clock, typename Duration> +bool ProxyLockableUniqueLock<Mutex>::try_lock_until( + const std::chrono::time_point<Clock, Duration>& time) { + proxylockable_detail::throwIfNoMutex(mutex_); + proxylockable_detail::throwIfAlreadyLocked(proxy_); + + if (auto state = mutex_->try_lock_until(time)) { + proxy_.emplace(std::move(state)); + return true; + } + + return false; +} + +template <typename Mutex> +void ProxyLockableUniqueLock<Mutex>::swap( + ProxyLockableUniqueLock& other) noexcept { + std::swap(mutex_, other.mutex_); + std::swap(proxy_, other.proxy_); +} + +template <typename Mutex> +typename ProxyLockableUniqueLock<Mutex>::mutex_type* +ProxyLockableUniqueLock<Mutex>::mutex() const noexcept { + return mutex_; +} + +template <typename Mutex> +typename ProxyLockableUniqueLock<Mutex>::proxy_type* +ProxyLockableUniqueLock<Mutex>::proxy() const noexcept { + return proxy_ ? std::addressof(proxy_.value()) : nullptr; +} + +template <typename Mutex> +bool ProxyLockableUniqueLock<Mutex>::owns_lock() const noexcept { + return proxy_.has_value(); +} + +template <typename Mutex> +ProxyLockableUniqueLock<Mutex>::operator bool() const noexcept { + return owns_lock(); +} + +template <typename Mutex> +ProxyLockableLockGuard<Mutex>::ProxyLockableLockGuard(mutex_type& mtx) + : ProxyLockableUniqueLock<Mutex>{mtx} {} + +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/detail/ProxyLockable.h b/src/rocksdb/third-party/folly/folly/synchronization/detail/ProxyLockable.h new file mode 100644 index 000000000..af922daf4 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/detail/ProxyLockable.h @@ -0,0 +1,164 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <folly/Optional.h> + +#include <mutex> + +namespace folly { +namespace detail { + +/** + * ProxyLockable is a "concept" that is used usually for mutexes that don't + * return void, but rather a proxy object that contains data that should be + * passed to the unlock function. + * + * This is in contrast with the normal Lockable concept that imposes no + * requirement on the return type of lock(), and requires an unlock() with no + * parameters. Here we require that lock() returns non-void and that unlock() + * accepts the return type of lock() by value, rvalue-reference or + * const-reference + * + * Here we define two classes, that can be used by the top level to implement + * specializations for std::unique_lock and std::lock_guard. Both + * ProxyLockableUniqueLock and ProxyLockableLockGuard implement the entire + * interface of std::unique_lock and std::lock_guard respectively + */ +template <typename Mutex> +class ProxyLockableUniqueLock { + public: + using mutex_type = Mutex; + using proxy_type = + _t<std::decay<decltype(std::declval<mutex_type>().lock())>>; + + /** + * Default constructor initializes the unique_lock to an empty state + */ + ProxyLockableUniqueLock() = default; + + /** + * Destructor releases the mutex if it is locked + */ + ~ProxyLockableUniqueLock(); + + /** + * Move constructor and move assignment operators take state from the other + * lock + */ + ProxyLockableUniqueLock(ProxyLockableUniqueLock&& other) noexcept; + ProxyLockableUniqueLock& operator=(ProxyLockableUniqueLock&&) noexcept; + + /** + * Locks the mutex, blocks until the mutex can be acquired. + * + * The mutex is guaranteed to be acquired after this function returns. + */ + ProxyLockableUniqueLock(mutex_type&) noexcept; + + /** + * Explicit locking constructors to control how the lock() method is called + * + * std::defer_lock_t causes the mutex to get tracked, but not locked + * std::try_to_lock_t causes try_lock() to be called. The current object is + * converts to true if the lock was successful + */ + ProxyLockableUniqueLock(mutex_type& mtx, std::defer_lock_t) noexcept; + ProxyLockableUniqueLock(mutex_type& mtx, std::try_to_lock_t); + + /** + * Timed locking constructors + */ + template <typename Rep, typename Period> + ProxyLockableUniqueLock( + mutex_type& mtx, + const std::chrono::duration<Rep, Period>& duration); + template <typename Clock, typename Duration> + ProxyLockableUniqueLock( + mutex_type& mtx, + const std::chrono::time_point<Clock, Duration>& time); + + /** + * Lock and unlock methods + * + * lock() and try_lock() throw if the mutex is already locked, or there is + * no mutex. unlock() throws if there is no mutex or if the mutex was not + * locked + */ + void lock(); + void unlock(); + bool try_lock(); + + /** + * Timed locking methods + * + * These throw if there was no mutex, or if the mutex was already locked + */ + template <typename Rep, typename Period> + bool try_lock_for(const std::chrono::duration<Rep, Period>& duration); + template <typename Clock, typename Duration> + bool try_lock_until(const std::chrono::time_point<Clock, Duration>& time); + + /** + * Swap this unique lock with the other one + */ + void swap(ProxyLockableUniqueLock& other) noexcept; + + /** + * Returns true if the unique lock contains a lock and also has acquired an + * exclusive lock successfully + */ + bool owns_lock() const noexcept; + explicit operator bool() const noexcept; + + /** + * mutex() return a pointer to the mutex if there is a contained mutex and + * proxy() returns a pointer to the contained proxy if the mutex is locked + * + * If the unique lock was not constructed with a mutex, then mutex() returns + * nullptr. If the mutex is not locked, then proxy() returns nullptr + */ + mutex_type* mutex() const noexcept; + proxy_type* proxy() const noexcept; + + private: + friend class ProxyLockableTest; + + /** + * If the optional has a value, the mutex is locked, if it is empty, it is + * not + */ + mutable folly::Optional<proxy_type> proxy_{}; + mutex_type* mutex_{nullptr}; +}; + +template <typename Mutex> +class ProxyLockableLockGuard : private ProxyLockableUniqueLock<Mutex> { + public: + using mutex_type = Mutex; + + /** + * Constructor locks the mutex, and destructor unlocks + */ + ProxyLockableLockGuard(mutex_type& mtx); + ~ProxyLockableLockGuard() = default; + + /** + * This class is not movable or assignable + * + * For more complicated usecases, consider the UniqueLock variant, which + * provides more options + */ + ProxyLockableLockGuard(const ProxyLockableLockGuard&) = delete; + ProxyLockableLockGuard(ProxyLockableLockGuard&&) = delete; + ProxyLockableLockGuard& operator=(ProxyLockableLockGuard&&) = delete; + ProxyLockableLockGuard& operator=(const ProxyLockableLockGuard&) = delete; +}; + +} // namespace detail +} // namespace folly + +#include <folly/synchronization/detail/ProxyLockable-inl.h> diff --git a/src/rocksdb/third-party/folly/folly/synchronization/detail/Sleeper.h b/src/rocksdb/third-party/folly/folly/synchronization/detail/Sleeper.h new file mode 100644 index 000000000..5bc98b333 --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/detail/Sleeper.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +/* + * @author Keith Adams <kma@fb.com> + * @author Jordan DeLong <delong.j@fb.com> + */ + +#include <cstdint> +#include <thread> + +#include <folly/portability/Asm.h> + +namespace folly { + +////////////////////////////////////////////////////////////////////// + +namespace detail { + +/* + * A helper object for the contended case. Starts off with eager + * spinning, and falls back to sleeping for small quantums. + */ +class Sleeper { + static const uint32_t kMaxActiveSpin = 4000; + + uint32_t spinCount; + + public: + Sleeper() noexcept : spinCount(0) {} + + static void sleep() noexcept { + /* + * Always sleep 0.5ms, assuming this will make the kernel put + * us down for whatever its minimum timer resolution is (in + * linux this varies by kernel version from 1ms to 10ms). + */ + std::this_thread::sleep_for(std::chrono::microseconds{500}); + } + + void wait() noexcept { + if (spinCount < kMaxActiveSpin) { + ++spinCount; + asm_volatile_pause(); + } else { + sleep(); + } + } +}; + +} // namespace detail +} // namespace folly + diff --git a/src/rocksdb/third-party/folly/folly/synchronization/detail/Spin.h b/src/rocksdb/third-party/folly/folly/synchronization/detail/Spin.h new file mode 100644 index 000000000..6eabc334e --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/detail/Spin.h @@ -0,0 +1,77 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <algorithm> +#include <chrono> +#include <thread> + +#include <folly/portability/Asm.h> +#include <folly/synchronization/WaitOptions.h> + +namespace folly { +namespace detail { + +enum class spin_result { + success, // condition passed + timeout, // exceeded deadline + advance, // exceeded current wait-options component timeout +}; + +template <typename Clock, typename Duration, typename F> +spin_result spin_pause_until( + std::chrono::time_point<Clock, Duration> const& deadline, + WaitOptions const& opt, + F f) { + if (opt.spin_max() <= opt.spin_max().zero()) { + return spin_result::advance; + } + + auto tbegin = Clock::now(); + while (true) { + if (f()) { + return spin_result::success; + } + + auto const tnow = Clock::now(); + if (tnow >= deadline) { + return spin_result::timeout; + } + + // Backward time discontinuity in Clock? revise pre_block starting point + tbegin = std::min(tbegin, tnow); + if (tnow >= tbegin + opt.spin_max()) { + return spin_result::advance; + } + + // The pause instruction is the polite way to spin, but it doesn't + // actually affect correctness to omit it if we don't have it. Pausing + // donates the full capabilities of the current core to its other + // hyperthreads for a dozen cycles or so. + asm_volatile_pause(); + } +} + +template <typename Clock, typename Duration, typename F> +spin_result spin_yield_until( + std::chrono::time_point<Clock, Duration> const& deadline, + F f) { + while (true) { + if (f()) { + return spin_result::success; + } + + auto const max = std::chrono::time_point<Clock, Duration>::max(); + if (deadline != max && Clock::now() >= deadline) { + return spin_result::timeout; + } + + std::this_thread::yield(); + } +} + +} // namespace detail +} // namespace folly diff --git a/src/rocksdb/third-party/folly/folly/synchronization/test/DistributedMutexTest.cpp b/src/rocksdb/third-party/folly/folly/synchronization/test/DistributedMutexTest.cpp new file mode 100644 index 000000000..5a9f58e3f --- /dev/null +++ b/src/rocksdb/third-party/folly/folly/synchronization/test/DistributedMutexTest.cpp @@ -0,0 +1,1142 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <folly/synchronization/DistributedMutex.h> +#include <folly/container/Array.h> +#include <folly/synchronization/Baton.h> + +#ifdef OS_AIX +#include "gtest/gtest.h" +#else +#include <gtest/gtest.h> +#endif + +#if !defined(ROCKSDB_LITE) && !defined(__ARM_ARCH) + +#include <chrono> +#include <cmath> +#include <thread> + +namespace folly { +namespace test { +template <template <typename> class Atomic> +using TestDistributedMutex = + folly::detail::distributed_mutex::DistributedMutex<Atomic, false>; +} // namespace test + +namespace { +constexpr auto kStressFactor = 1000; +constexpr auto kStressTestSeconds = 2; +constexpr auto kForever = std::chrono::hours{100}; + +int sum(int n) { + return (n * (n + 1)) / 2; +} + +template <template <typename> class Atom = std::atomic> +void basicNThreads(int numThreads, int iterations = kStressFactor) { + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& barrier = std::atomic<int>{0}; + auto&& threads = std::vector<std::thread>{}; + auto&& result = std::vector<int>{}; + + auto&& function = [&](int id) { + return [&, id] { + for (auto j = 0; j < iterations; ++j) { + auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex}; + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + result.push_back(id); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + }; + }; + + for (auto i = 1; i <= numThreads; ++i) { + threads.push_back(std::thread(function(i))); + } + for (auto& thread : threads) { + thread.join(); + } + + auto total = 0; + for (auto value : result) { + total += value; + } + EXPECT_EQ(total, sum(numThreads) * iterations); +} + +template <template <typename> class Atom = std::atomic> +void lockWithTryAndTimedNThreads( + int numThreads, + std::chrono::seconds duration) { + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& barrier = std::atomic<int>{0}; + auto&& threads = std::vector<std::thread>{}; + auto&& stop = std::atomic<bool>{false}; + + auto&& lockUnlockFunction = [&]() { + while (!stop.load()) { + auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex}; + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + }; + + auto tryLockFunction = [&]() { + while (!stop.load()) { + using Mutex = _t<std::decay<decltype(mutex)>>; + auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock}; + if (lck.try_lock()) { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + } + }; + + auto timedLockFunction = [&]() { + while (!stop.load()) { + using Mutex = _t<std::decay<decltype(mutex)>>; + auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock}; + if (lck.try_lock_for(kForever)) { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + } + }; + + for (auto i = 0; i < (numThreads / 3); ++i) { + threads.push_back(std::thread(lockUnlockFunction)); + } + for (auto i = 0; i < (numThreads / 3); ++i) { + threads.push_back(std::thread(tryLockFunction)); + } + for (auto i = 0; i < (numThreads / 3); ++i) { + threads.push_back(std::thread(timedLockFunction)); + } + + /* sleep override */ + std::this_thread::sleep_for(duration); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} + +template <template <typename> class Atom = std::atomic> +void combineNThreads(int numThreads, std::chrono::seconds duration) { + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& barrier = std::atomic<int>{0}; + auto&& threads = std::vector<std::thread>{}; + auto&& stop = std::atomic<bool>{false}; + + auto&& function = [&]() { + return [&] { + auto&& expected = std::uint64_t{0}; + auto&& local = std::atomic<std::uint64_t>{0}; + auto&& result = std::atomic<std::uint64_t>{0}; + while (!stop.load()) { + ++expected; + auto current = mutex.lock_combine([&]() { + result.fetch_add(1); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1); + std::this_thread::yield(); + SCOPE_EXIT { + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + }; + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2); + return local.fetch_add(1); + }); + EXPECT_EQ(current, expected - 1); + } + + EXPECT_EQ(expected, result.load()); + }; + }; + + for (auto i = 1; i <= numThreads; ++i) { + threads.push_back(std::thread(function())); + } + + /* sleep override */ + std::this_thread::sleep_for(duration); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} + +template <template <typename> class Atom = std::atomic> +void combineWithLockNThreads(int numThreads, std::chrono::seconds duration) { + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& barrier = std::atomic<int>{0}; + auto&& threads = std::vector<std::thread>{}; + auto&& stop = std::atomic<bool>{false}; + + auto&& lockUnlockFunction = [&]() { + while (!stop.load()) { + auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex}; + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + }; + + auto&& combineFunction = [&]() { + auto&& expected = std::uint64_t{0}; + auto&& total = std::atomic<std::uint64_t>{0}; + + while (!stop.load()) { + ++expected; + auto current = mutex.lock_combine([&]() { + auto iteration = total.fetch_add(1); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1); + std::this_thread::yield(); + SCOPE_EXIT { + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + }; + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2); + return iteration; + }); + + EXPECT_EQ(expected, current + 1); + } + + EXPECT_EQ(expected, total.load()); + }; + + for (auto i = 1; i < (numThreads / 2); ++i) { + threads.push_back(std::thread(combineFunction)); + } + for (auto i = 0; i < (numThreads / 2); ++i) { + threads.push_back(std::thread(lockUnlockFunction)); + } + + /* sleep override */ + std::this_thread::sleep_for(duration); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} + +template <template <typename> class Atom = std::atomic> +void combineWithTryLockNThreads(int numThreads, std::chrono::seconds duration) { + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& barrier = std::atomic<int>{0}; + auto&& threads = std::vector<std::thread>{}; + auto&& stop = std::atomic<bool>{false}; + + auto&& lockUnlockFunction = [&]() { + while (!stop.load()) { + auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex}; + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + }; + + auto&& combineFunction = [&]() { + auto&& expected = std::uint64_t{0}; + auto&& total = std::atomic<std::uint64_t>{0}; + + while (!stop.load()) { + ++expected; + auto current = mutex.lock_combine([&]() { + auto iteration = total.fetch_add(1); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1); + std::this_thread::yield(); + SCOPE_EXIT { + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + }; + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2); + return iteration; + }); + + EXPECT_EQ(expected, current + 1); + } + + EXPECT_EQ(expected, total.load()); + }; + + auto tryLockFunction = [&]() { + while (!stop.load()) { + using Mutex = _t<std::decay<decltype(mutex)>>; + auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock}; + if (lck.try_lock()) { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + } + }; + + for (auto i = 0; i < (numThreads / 3); ++i) { + threads.push_back(std::thread(lockUnlockFunction)); + } + for (auto i = 0; i < (numThreads / 3); ++i) { + threads.push_back(std::thread(combineFunction)); + } + for (auto i = 0; i < (numThreads / 3); ++i) { + threads.push_back(std::thread(tryLockFunction)); + } + + /* sleep override */ + std::this_thread::sleep_for(duration); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} + +template <template <typename> class Atom = std::atomic> +void combineWithLockTryAndTimedNThreads( + int numThreads, + std::chrono::seconds duration) { + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& barrier = std::atomic<int>{0}; + auto&& threads = std::vector<std::thread>{}; + auto&& stop = std::atomic<bool>{false}; + + auto&& lockUnlockFunction = [&]() { + while (!stop.load()) { + auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex}; + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + }; + + auto&& combineFunction = [&]() { + auto&& expected = std::uint64_t{0}; + auto&& total = std::atomic<std::uint64_t>{0}; + + while (!stop.load()) { + ++expected; + auto current = mutex.lock_combine([&]() { + auto iteration = total.fetch_add(1); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1); + std::this_thread::yield(); + SCOPE_EXIT { + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + }; + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2); + + // return a non-trivially-copyable object that occupies all the + // storage we use to coalesce returns to test that codepath + return folly::make_array( + iteration, + iteration + 1, + iteration + 2, + iteration + 3, + iteration + 4, + iteration + 5); + }); + + EXPECT_EQ(expected, current[0] + 1); + EXPECT_EQ(expected, current[1]); + EXPECT_EQ(expected, current[2] - 1); + EXPECT_EQ(expected, current[3] - 2); + EXPECT_EQ(expected, current[4] - 3); + EXPECT_EQ(expected, current[5] - 4); + } + + EXPECT_EQ(expected, total.load()); + }; + + auto tryLockFunction = [&]() { + while (!stop.load()) { + using Mutex = _t<std::decay<decltype(mutex)>>; + auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock}; + if (lck.try_lock()) { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + } + }; + + auto timedLockFunction = [&]() { + while (!stop.load()) { + using Mutex = _t<std::decay<decltype(mutex)>>; + auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock}; + if (lck.try_lock_for(kForever)) { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + std::this_thread::yield(); + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + } + } + }; + + for (auto i = 0; i < (numThreads / 4); ++i) { + threads.push_back(std::thread(lockUnlockFunction)); + } + for (auto i = 0; i < (numThreads / 4); ++i) { + threads.push_back(std::thread(combineFunction)); + } + for (auto i = 0; i < (numThreads / 4); ++i) { + threads.push_back(std::thread(tryLockFunction)); + } + for (auto i = 0; i < (numThreads / 4); ++i) { + threads.push_back(std::thread(timedLockFunction)); + } + + /* sleep override */ + std::this_thread::sleep_for(duration); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace + +TEST(DistributedMutex, InternalDetailTestOne) { + auto value = 0; + auto ptr = reinterpret_cast<std::uintptr_t>(&value); + EXPECT_EQ(folly::detail::distributed_mutex::extractPtr<int>(ptr), &value); + ptr = ptr | 0b1; + EXPECT_EQ(folly::detail::distributed_mutex::extractPtr<int>(ptr), &value); +} + +TEST(DistributedMutex, Basic) { + auto&& mutex = folly::DistributedMutex{}; + auto state = mutex.lock(); + mutex.unlock(std::move(state)); +} + +TEST(DistributedMutex, BasicTryLock) { + auto&& mutex = folly::DistributedMutex{}; + + while (true) { + auto state = mutex.try_lock(); + if (state) { + mutex.unlock(std::move(state)); + break; + } + } +} + +TEST(DistributedMutex, StressTwoThreads) { + basicNThreads(2); +} +TEST(DistributedMutex, StressThreeThreads) { + basicNThreads(3); +} +TEST(DistributedMutex, StressFourThreads) { + basicNThreads(4); +} +TEST(DistributedMutex, StressFiveThreads) { + basicNThreads(5); +} +TEST(DistributedMutex, StressSixThreads) { + basicNThreads(6); +} +TEST(DistributedMutex, StressSevenThreads) { + basicNThreads(7); +} +TEST(DistributedMutex, StressEightThreads) { + basicNThreads(8); +} +TEST(DistributedMutex, StressSixteenThreads) { + basicNThreads(16); +} +TEST(DistributedMutex, StressThirtyTwoThreads) { + basicNThreads(32); +} +TEST(DistributedMutex, StressSixtyFourThreads) { + basicNThreads(64); +} +TEST(DistributedMutex, StressHundredThreads) { + basicNThreads(100); +} +TEST(DistributedMutex, StressHardwareConcurrencyThreads) { + basicNThreads(std::thread::hardware_concurrency()); +} + +TEST(DistributedMutex, StressThreeThreadsLockTryAndTimed) { + lockWithTryAndTimedNThreads(3, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixThreadsLockTryAndTimed) { + lockWithTryAndTimedNThreads(6, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressTwelveThreadsLockTryAndTimed) { + lockWithTryAndTimedNThreads(12, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressTwentyFourThreadsLockTryAndTimed) { + lockWithTryAndTimedNThreads(24, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressFourtyEightThreadsLockTryAndTimed) { + lockWithTryAndTimedNThreads(48, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixtyFourThreadsLockTryAndTimed) { + lockWithTryAndTimedNThreads(64, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressHwConcThreadsLockTryAndTimed) { + lockWithTryAndTimedNThreads( + std::thread::hardware_concurrency(), + std::chrono::seconds{kStressTestSeconds}); +} + +TEST(DistributedMutex, StressTwoThreadsCombine) { + combineNThreads(2, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressThreeThreadsCombine) { + combineNThreads(3, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressFourThreadsCombine) { + combineNThreads(4, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressFiveThreadsCombine) { + combineNThreads(5, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixThreadsCombine) { + combineNThreads(6, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSevenThreadsCombine) { + combineNThreads(7, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressEightThreadsCombine) { + combineNThreads(8, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixteenThreadsCombine) { + combineNThreads(16, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressThirtyTwoThreadsCombine) { + combineNThreads(32, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixtyFourThreadsCombine) { + combineNThreads(64, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressHundredThreadsCombine) { + combineNThreads(100, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombine) { + combineNThreads( + std::thread::hardware_concurrency(), + std::chrono::seconds{kStressTestSeconds}); +} + +TEST(DistributedMutex, StressTwoThreadsCombineAndLock) { + combineWithLockNThreads(2, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressFourThreadsCombineAndLock) { + combineWithLockNThreads(4, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressEightThreadsCombineAndLock) { + combineWithLockNThreads(8, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixteenThreadsCombineAndLock) { + combineWithLockNThreads(16, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressThirtyTwoThreadsCombineAndLock) { + combineWithLockNThreads(32, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixtyFourThreadsCombineAndLock) { + combineWithLockNThreads(64, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombineAndLock) { + combineWithLockNThreads( + std::thread::hardware_concurrency(), + std::chrono::seconds{kStressTestSeconds}); +} + +TEST(DistributedMutex, StressThreeThreadsCombineTryLockAndLock) { + combineWithTryLockNThreads(3, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixThreadsCombineTryLockAndLock) { + combineWithTryLockNThreads(6, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressTwelveThreadsCombineTryLockAndLock) { + combineWithTryLockNThreads(12, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressTwentyFourThreadsCombineTryLockAndLock) { + combineWithTryLockNThreads(24, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressFourtyEightThreadsCombineTryLockAndLock) { + combineWithTryLockNThreads(48, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixtyFourThreadsCombineTryLockAndLock) { + combineWithTryLockNThreads(64, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombineTryLockAndLock) { + combineWithTryLockNThreads( + std::thread::hardware_concurrency(), + std::chrono::seconds{kStressTestSeconds}); +} + +TEST(DistributedMutex, StressThreeThreadsCombineTryLockLockAndTimed) { + combineWithLockTryAndTimedNThreads( + 3, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixThreadsCombineTryLockLockAndTimed) { + combineWithLockTryAndTimedNThreads( + 6, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressTwelveThreadsCombineTryLockLockAndTimed) { + combineWithLockTryAndTimedNThreads( + 12, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressTwentyFourThreadsCombineTryLockLockAndTimed) { + combineWithLockTryAndTimedNThreads( + 24, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressFourtyEightThreadsCombineTryLockLockAndTimed) { + combineWithLockTryAndTimedNThreads( + 48, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressSixtyFourThreadsCombineTryLockLockAndTimed) { + combineWithLockTryAndTimedNThreads( + 64, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressHwConcurrencyThreadsCombineTryLockLockAndTimed) { + combineWithLockTryAndTimedNThreads( + std::thread::hardware_concurrency(), + std::chrono::seconds{kStressTestSeconds}); +} + +TEST(DistributedMutex, StressTryLock) { + auto&& mutex = folly::DistributedMutex{}; + + for (auto i = 0; i < kStressFactor; ++i) { + while (true) { + auto state = mutex.try_lock(); + if (state) { + mutex.unlock(std::move(state)); + break; + } + } + } +} + +TEST(DistributedMutex, TimedLockTimeout) { + auto&& mutex = folly::DistributedMutex{}; + auto&& start = folly::Baton<>{}; + auto&& done = folly::Baton<>{}; + + auto thread = std::thread{[&]() { + auto state = mutex.lock(); + start.post(); + done.wait(); + mutex.unlock(std::move(state)); + }}; + + start.wait(); + auto result = mutex.try_lock_for(std::chrono::milliseconds{10}); + EXPECT_FALSE(result); + done.post(); + thread.join(); +} + +TEST(DistributedMutex, TimedLockAcquireAfterUnlock) { + auto&& mutex = folly::DistributedMutex{}; + auto&& start = folly::Baton<>{}; + + auto thread = std::thread{[&]() { + auto state = mutex.lock(); + start.post(); + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + mutex.unlock(std::move(state)); + }}; + + start.wait(); + auto result = mutex.try_lock_for(kForever); + EXPECT_TRUE(result); + thread.join(); +} + +namespace { +template <template <typename> class Atom = std::atomic> +void stressTryLockWithConcurrentLocks( + int numThreads, + int iterations = kStressFactor) { + auto&& threads = std::vector<std::thread>{}; + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& atomic = std::atomic<std::uint64_t>{0}; + + for (auto i = 0; i < numThreads; ++i) { + threads.push_back(std::thread([&] { + for (auto j = 0; j < iterations; ++j) { + auto state = mutex.lock(); + EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1); + mutex.unlock(std::move(state)); + } + })); + } + + for (auto i = 0; i < iterations; ++i) { + if (auto state = mutex.try_lock()) { + EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1); + mutex.unlock(std::move(state)); + } + } + + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace + +TEST(DistributedMutex, StressTryLockWithConcurrentLocksTwoThreads) { + stressTryLockWithConcurrentLocks(2); +} +TEST(DistributedMutex, StressTryLockWithConcurrentLocksFourThreads) { + stressTryLockWithConcurrentLocks(4); +} +TEST(DistributedMutex, StressTryLockWithConcurrentLocksEightThreads) { + stressTryLockWithConcurrentLocks(8); +} +TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixteenThreads) { + stressTryLockWithConcurrentLocks(16); +} +TEST(DistributedMutex, StressTryLockWithConcurrentLocksThirtyTwoThreads) { + stressTryLockWithConcurrentLocks(32); +} +TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixtyFourThreads) { + stressTryLockWithConcurrentLocks(64); +} + +namespace { +template <template <typename> class Atom = std::atomic> +void concurrentTryLocks(int numThreads, int iterations = kStressFactor) { + auto&& threads = std::vector<std::thread>{}; + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& atomic = std::atomic<std::uint64_t>{0}; + + for (auto i = 0; i < numThreads; ++i) { + threads.push_back(std::thread([&] { + for (auto j = 0; j < iterations; ++j) { + if (auto state = mutex.try_lock()) { + EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1); + mutex.unlock(std::move(state)); + } + } + })); + } + + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace + +TEST(DistributedMutex, StressTryLockWithTwoThreads) { + concurrentTryLocks(2); +} +TEST(DistributedMutex, StressTryLockFourThreads) { + concurrentTryLocks(4); +} +TEST(DistributedMutex, StressTryLockEightThreads) { + concurrentTryLocks(8); +} +TEST(DistributedMutex, StressTryLockSixteenThreads) { + concurrentTryLocks(16); +} +TEST(DistributedMutex, StressTryLockThirtyTwoThreads) { + concurrentTryLocks(32); +} +TEST(DistributedMutex, StressTryLockSixtyFourThreads) { + concurrentTryLocks(64); +} + +namespace { +class TestConstruction { + public: + TestConstruction() = delete; + explicit TestConstruction(int) { + defaultConstructs().fetch_add(1, std::memory_order_relaxed); + } + TestConstruction(TestConstruction&&) noexcept { + moveConstructs().fetch_add(1, std::memory_order_relaxed); + } + TestConstruction(const TestConstruction&) { + copyConstructs().fetch_add(1, std::memory_order_relaxed); + } + TestConstruction& operator=(const TestConstruction&) { + copyAssigns().fetch_add(1, std::memory_order_relaxed); + return *this; + } + TestConstruction& operator=(TestConstruction&&) { + moveAssigns().fetch_add(1, std::memory_order_relaxed); + return *this; + } + ~TestConstruction() { + destructs().fetch_add(1, std::memory_order_relaxed); + } + + static std::atomic<std::uint64_t>& defaultConstructs() { + static auto&& atomic = std::atomic<std::uint64_t>{0}; + return atomic; + } + static std::atomic<std::uint64_t>& moveConstructs() { + static auto&& atomic = std::atomic<std::uint64_t>{0}; + return atomic; + } + static std::atomic<std::uint64_t>& copyConstructs() { + static auto&& atomic = std::atomic<std::uint64_t>{0}; + return atomic; + } + static std::atomic<std::uint64_t>& moveAssigns() { + static auto&& atomic = std::atomic<std::uint64_t>{0}; + return atomic; + } + static std::atomic<std::uint64_t>& copyAssigns() { + static auto&& atomic = std::atomic<std::uint64_t>{0}; + return atomic; + } + static std::atomic<std::uint64_t>& destructs() { + static auto&& atomic = std::atomic<std::uint64_t>{0}; + return atomic; + } + + static void reset() { + defaultConstructs().store(0); + moveConstructs().store(0); + copyConstructs().store(0); + copyAssigns().store(0); + destructs().store(0); + } +}; +} // namespace + +TEST(DistributedMutex, TestAppropriateDestructionAndConstructionWithCombine) { + auto&& mutex = folly::DistributedMutex{}; + auto&& stop = std::atomic<bool>{false}; + + // test the simple return path to make sure that in the absence of + // contention, we get the right number of constructs and destructs + mutex.lock_combine([]() { return TestConstruction{1}; }); + auto moves = TestConstruction::moveConstructs().load(); + auto defaults = TestConstruction::defaultConstructs().load(); + EXPECT_EQ(TestConstruction::defaultConstructs().load(), 1); + EXPECT_TRUE(moves == 0 || moves == 1); + EXPECT_EQ(TestConstruction::destructs().load(), moves + defaults); + + // loop and make sure we were able to test the path where the critical + // section of the thread gets combined, and assert that we see the expected + // number of constructions and destructions + // + // this implements a timed backoff to test the combined path, so we use the + // smallest possible delay in tests + auto thread = std::thread{[&]() { + auto&& duration = std::chrono::milliseconds{10}; + while (!stop.load()) { + TestConstruction::reset(); + auto&& ready = folly::Baton<>{}; + auto&& release = folly::Baton<>{}; + + // make one thread start it's critical section, signal and wait for + // another thread to enqueue, to test the + auto innerThread = std::thread{[&]() { + mutex.lock_combine([&]() { + ready.post(); + release.wait(); + /* sleep override */ + std::this_thread::sleep_for(duration); + }); + }}; + + // wait for the thread to get in its critical section, then tell it to go + ready.wait(); + release.post(); + mutex.lock_combine([&]() { return TestConstruction{1}; }); + + innerThread.join(); + + // at this point we should have only one default construct, either 3 + // or 4 move constructs the same number of destructions as + // constructions + auto innerDefaults = TestConstruction::defaultConstructs().load(); + auto innerMoves = TestConstruction::moveConstructs().load(); + auto destructs = TestConstruction::destructs().load(); + EXPECT_EQ(innerDefaults, 1); + EXPECT_TRUE(innerMoves == 3 || innerMoves == 4 || innerMoves == 1); + EXPECT_EQ(destructs, innerMoves + innerDefaults); + EXPECT_EQ(TestConstruction::moveAssigns().load(), 0); + EXPECT_EQ(TestConstruction::copyAssigns().load(), 0); + + // increase duration by 100ms each iteration + duration = duration + std::chrono::milliseconds{100}; + } + }}; + + /* sleep override */ + std::this_thread::sleep_for(std::chrono::seconds{kStressTestSeconds}); + stop.store(true); + thread.join(); +} + +namespace { +template <template <typename> class Atom = std::atomic> +void concurrentLocksManyMutexes(int numThreads, std::chrono::seconds duration) { + using DMutex = folly::detail::distributed_mutex::DistributedMutex<Atom>; + const auto&& kNumMutexes = 10; + auto&& threads = std::vector<std::thread>{}; + auto&& mutexes = std::vector<DMutex>(kNumMutexes); + auto&& barriers = std::vector<std::atomic<std::uint64_t>>(kNumMutexes); + auto&& stop = std::atomic<bool>{false}; + + for (auto i = 0; i < numThreads; ++i) { + threads.push_back(std::thread([&] { + auto&& total = std::atomic<std::uint64_t>{0}; + auto&& expected = std::uint64_t{0}; + + for (auto j = 0; !stop.load(std::memory_order_relaxed); ++j) { + auto& mutex = mutexes[j % kNumMutexes]; + auto& barrier = barriers[j % kNumMutexes]; + + ++expected; + auto result = mutex.lock_combine([&]() { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1); + std::this_thread::yield(); + SCOPE_EXIT { + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + }; + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2); + return total.fetch_add(1, std::memory_order_relaxed); + }); + EXPECT_EQ(result, expected - 1); + } + + EXPECT_EQ(total.load(), expected); + })); + } + + /* sleep override */ + std::this_thread::sleep_for(duration); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace + +TEST(DistributedMutex, StressWithManyMutexesAlternatingTwoThreads) { + concurrentLocksManyMutexes(2, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressWithManyMutexesAlternatingFourThreads) { + concurrentLocksManyMutexes(4, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressWithManyMutexesAlternatingEightThreads) { + concurrentLocksManyMutexes(8, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressWithManyMutexesAlternatingSixteenThreads) { + concurrentLocksManyMutexes(16, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressWithManyMutexesAlternatingThirtyTwoThreads) { + concurrentLocksManyMutexes(32, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressWithManyMutexesAlternatingSixtyFourThreads) { + concurrentLocksManyMutexes(64, std::chrono::seconds{kStressTestSeconds}); +} + +namespace { +class ExceptionWithConstructionTrack : public std::exception { + public: + explicit ExceptionWithConstructionTrack(int id) + : id_{std::to_string(id)}, constructionTrack_{id} {} + + const char* what() const noexcept override { + return id_.c_str(); + } + + private: + std::string id_; + TestConstruction constructionTrack_; +}; +} // namespace + +TEST(DistributedMutex, TestExceptionPropagationUncontended) { + TestConstruction::reset(); + auto&& mutex = folly::DistributedMutex{}; + auto&& thread = std::thread{[&]() { + try { + mutex.lock_combine([&]() { throw ExceptionWithConstructionTrack{46}; }); + } catch (std::exception& exc) { + auto integer = std::stoi(exc.what()); + EXPECT_EQ(integer, 46); + EXPECT_GT(TestConstruction::defaultConstructs(), 0); + } + EXPECT_EQ( + TestConstruction::defaultConstructs(), TestConstruction::destructs()); + }}; + thread.join(); +} + +namespace { +template <template <typename> class Atom = std::atomic> +void concurrentExceptionPropagationStress( + int numThreads, + std::chrono::milliseconds t) { + // this test fails under with a false negative under older versions of TSAN + // for some reason so disable it when TSAN is enabled + if (folly::kIsSanitizeThread) { + return; + } + + TestConstruction::reset(); + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& threads = std::vector<std::thread>{}; + auto&& stop = std::atomic<bool>{false}; + auto&& barrier = std::atomic<std::uint64_t>{0}; + + for (auto i = 0; i < numThreads; ++i) { + threads.push_back(std::thread([&]() { + for (auto j = 0; !stop.load(); ++j) { + auto value = int{0}; + try { + value = mutex.lock_combine([&]() { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1); + std::this_thread::yield(); + SCOPE_EXIT { + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + }; + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2); + + // we only throw an exception once every 3 times + if (!(j % 3)) { + throw ExceptionWithConstructionTrack{j}; + } + + return j; + }); + } catch (std::exception& exc) { + value = std::stoi(exc.what()); + } + + EXPECT_EQ(value, j); + } + })); + } + + /* sleep override */ + std::this_thread::sleep_for(t); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace + +TEST(DistributedMutex, TestExceptionPropagationStressTwoThreads) { + concurrentExceptionPropagationStress( + 2, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, TestExceptionPropagationStressFourThreads) { + concurrentExceptionPropagationStress( + 4, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, TestExceptionPropagationStressEightThreads) { + concurrentExceptionPropagationStress( + 8, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, TestExceptionPropagationStressSixteenThreads) { + concurrentExceptionPropagationStress( + 16, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, TestExceptionPropagationStressThirtyTwoThreads) { + concurrentExceptionPropagationStress( + 32, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, TestExceptionPropagationStressSixtyFourThreads) { + concurrentExceptionPropagationStress( + 64, std::chrono::seconds{kStressTestSeconds}); +} + +namespace { +std::array<std::uint64_t, 8> makeMonotonicArray(int start) { + auto array = std::array<std::uint64_t, 8>{}; + for (auto& element : array) { element = start++; } + return array; +} + +template <template <typename> class Atom = std::atomic> +void concurrentBigValueReturnStress( + int numThreads, + std::chrono::milliseconds t) { + auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{}; + auto&& threads = std::vector<std::thread>{}; + auto&& stop = std::atomic<bool>{false}; + auto&& barrier = std::atomic<std::uint64_t>{0}; + + for (auto i = 0; i < numThreads; ++i) { + threads.push_back(std::thread([&]() { + auto&& value = std::atomic<std::uint64_t>{0}; + + for (auto j = 0; !stop.load(); ++j) { + auto returned = mutex.lock_combine([&]() { + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0); + EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1); + std::this_thread::yield(); + // return an entire cacheline worth of data + auto current = value.fetch_add(1, std::memory_order_relaxed); + SCOPE_EXIT { + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1); + }; + EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2); + return makeMonotonicArray(static_cast<int>(current)); + }); + + auto expected = value.load() - 1; + for (auto& element : returned) { + EXPECT_EQ(element, expected++); + } + } + })); + } + + /* sleep override */ + std::this_thread::sleep_for(t); + stop.store(true); + for (auto& thread : threads) { + thread.join(); + } +} +} // namespace + +TEST(DistributedMutex, StressBigValueReturnTwoThreads) { + concurrentBigValueReturnStress(2, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressBigValueReturnFourThreads) { + concurrentBigValueReturnStress(4, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressBigValueReturnEightThreads) { + concurrentBigValueReturnStress(8, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressBigValueReturnSixteenThreads) { + concurrentBigValueReturnStress(16, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressBigValueReturnThirtyTwoThreads) { + concurrentBigValueReturnStress(32, std::chrono::seconds{kStressTestSeconds}); +} +TEST(DistributedMutex, StressBigValueReturnSixtyFourThreads) { + concurrentBigValueReturnStress(64, std::chrono::seconds{kStressTestSeconds}); +} + +} // namespace folly + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +int main(int /*argc*/, char** /*argv*/) { + printf("DistributedMutex is not supported in ROCKSDB_LITE or on ARM\n"); + return 0; +} +#endif // !ROCKSDB_LITE && !__ARM_ARCH |