diff options
Diffstat (limited to '')
24 files changed, 7529 insertions, 0 deletions
diff --git a/vendor/sharded-slab/.cargo-checksum.json b/vendor/sharded-slab/.cargo-checksum.json new file mode 100644 index 000000000..2b594b836 --- /dev/null +++ b/vendor/sharded-slab/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"9c0e06d18ed28fa4c4f0e010e392c0b3562c8ada22fcc2f22133d654686c7f15","Cargo.toml":"5967b83356e0272aa80a10f37dbec0bc2968f08bee6626c83a5be74d32ce767e","IMPLEMENTATION.md":"c192c8288dffc0ecbf41d59ae8b1664ea9b323b3dd16f99f3041019b464fb55e","LICENSE":"eafbfa606bc005ed7fd2f623d65af17ffe4ef7c017221ac14405bf4140771ea9","README.md":"4406c839a896217ab92e2f17d9162270d57b24204bcbbb12284a66cc72f3a40a","benches/bench.rs":"dd2b45ae38b3480537649d8fef80d566aa71c21933b59f3bb2200eb1793adc3b","bin/loom.sh":"903241204befb0f1f6e7ea98e2a24244c98e8567595c03af42f65b78a4b231c1","src/cfg.rs":"b26b129bd1e27f279f76b4ce3a7e2d22b092077be50dab3cfde195d6ea662d2d","src/clear.rs":"765537aa6a4c65d9b42d6eb46b3dd801c2723b5c5b6bcdcabf671d63278f938e","src/implementation.rs":"dec81aebb88e006ab405256fb16e8ecca6b07c3582c1c803bdfcc2a84b1d21cf","src/iter.rs":"a6880250c411471842cac8b9d28b7cbba4600ef33958530fd39ab54fbe2fdbe0","src/lib.rs":"b881289761889e91f7963a5e6d7d12377dc4600cac7b63fd69b06c8e2054d178","src/macros.rs":"3d22bc75b29c551e8a8cde347e61ebe7c981862190ca82bbf5f529ff7ec1d1dd","src/page/mod.rs":"1c61f1b42e2fab7e402ee22edac0dc71101da833fe4139651f4c79139e42d277","src/page/slot.rs":"053f3090af39e5d5205a969b808bf29eda34ce31e1c40c79b9e4ccfd60220332","src/page/stack.rs":"549853d1a2c1c15987eee89d4327c4592f274c3cc1c5b695e419bd460f61c5c4","src/pool.rs":"cf04cec4e24984a40c9300dc86baf6fb79432be7cd3a2276ad69a0c932be4098","src/shard.rs":"dd2e97d1357f7792b56a37d09e9598ccecfab3e3f8e602a3cf562978fa20e888","src/sync.rs":"9ad35bbb515f7e56c62b9e5e048a147a3490b419e2ebe79f66047674953f1689","src/tests/loom_pool.rs":"363bb7fbd6b99b870cc51ae3eb5b5a9cc02e5ef7ba0e5b83a495d7f1aab58dcc","src/tests/loom_slab.rs":"3d230cdf0048f02fc5046038e1574bc9c5f61972952aa21d86992e856161d3e6","src/tests/mod.rs":"44808a038c645f16ec26658a835fee815850f06563a2e680136b0fdbe6e44e39","src/tid.rs":"6a87728bebb947a47a9593d0274d1c332f4f6c4ffc733211546dc447de55d913"},"package":"900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"}
\ No newline at end of file diff --git a/vendor/sharded-slab/CHANGELOG.md b/vendor/sharded-slab/CHANGELOG.md new file mode 100644 index 000000000..9dc7c2f70 --- /dev/null +++ b/vendor/sharded-slab/CHANGELOG.md @@ -0,0 +1,186 @@ +<a name="0.1.4"></a> +### 0.1.4 (2021-10-12) + + +#### Features + +* emit a nicer panic when thread count overflows `MAX_SHARDS` (#64) ([f1ed058a](https://github.com/hawkw/sharded-slab/commit/f1ed058a3ee296eff033fc0fb88f62a8b2f83f10)) + + + +<a name="0.1.3"></a> +### 0.1.3 (2021-08-02) + + +#### Bug Fixes + +* set up MSRV in CI (#61) ([dfcc9080](https://github.com/hawkw/sharded-slab/commit/dfcc9080a62d08e359f298a9ffb0f275928b83e4), closes [#60](https://github.com/hawkw/sharded-slab/issues/60)) +* **tests:** duplicate `hint` mod defs with loom ([0ce3fd91](https://github.com/hawkw/sharded-slab/commit/0ce3fd91feac8b4edb4f1ece6aebfc4ba4e50026)) + + + +<a name="0.1.2"></a> +### 0.1.2 (2021-08-01) + + +#### Bug Fixes + +* make debug assertions drop safe ([26d35a69](https://github.com/hawkw/sharded-slab/commit/26d35a695c9e5d7c62ab07cc5e66a0c6f8b6eade)) + +#### Features + +* improve panics on thread ID bit exhaustion ([9ecb8e61](https://github.com/hawkw/sharded-slab/commit/9ecb8e614f107f68b5c6ba770342ae72af1cd07b)) + + + +<a name="0.1.1"></a> +## 0.1.1 (2021-1-4) + + +#### Bug Fixes + +* change `loom` to an optional dependency ([9bd442b5](https://github.com/hawkw/sharded-slab/commit/9bd442b57bc56153a67d7325144ebcf303e0fe98)) + +<a name="0.1.0"></a> +## 0.1.0 (2020-10-20) + + +#### Bug Fixes + +* fix `remove` and `clear` returning true when the key is stale ([b52d38b2](https://github.com/hawkw/sharded-slab/commit/b52d38b2d2d3edc3a59d3dba6b75095bbd864266)) + +#### Breaking Changes + +* **Pool:** change `Pool::create` to return a mutable guard (#48) ([778065ea](https://github.com/hawkw/sharded-slab/commit/778065ead83523e0a9d951fbd19bb37fda3cc280), closes [#41](https://github.com/hawkw/sharded-slab/issues/41), [#16](https://github.com/hawkw/sharded-slab/issues/16)) +* **Slab:** rename `Guard` to `Entry` for consistency ([425ad398](https://github.com/hawkw/sharded-slab/commit/425ad39805ee818dc6b332286006bc92c8beab38)) + +#### Features + +* add missing `Debug` impls ([71a8883f](https://github.com/hawkw/sharded-slab/commit/71a8883ff4fd861b95e81840cb5dca167657fe36)) +* **Pool:** + * add `Pool::create_owned` and `OwnedRefMut` ([f7774ae0](https://github.com/hawkw/sharded-slab/commit/f7774ae0c5be99340f1e7941bde62f7044f4b4d8)) + * add `Arc<Pool>::get_owned` and `OwnedRef` ([3e566d91](https://github.com/hawkw/sharded-slab/commit/3e566d91e1bc8cc4630a8635ad24b321ec047fe7), closes [#29](https://github.com/hawkw/sharded-slab/issues/29)) + * change `Pool::create` to return a mutable guard (#48) ([778065ea](https://github.com/hawkw/sharded-slab/commit/778065ead83523e0a9d951fbd19bb37fda3cc280), closes [#41](https://github.com/hawkw/sharded-slab/issues/41), [#16](https://github.com/hawkw/sharded-slab/issues/16)) +* **Slab:** + * add `Arc<Slab>::get_owned` and `OwnedEntry` ([53a970a2](https://github.com/hawkw/sharded-slab/commit/53a970a2298c30c1afd9578268c79ccd44afba05), closes [#29](https://github.com/hawkw/sharded-slab/issues/29)) + * rename `Guard` to `Entry` for consistency ([425ad398](https://github.com/hawkw/sharded-slab/commit/425ad39805ee818dc6b332286006bc92c8beab38)) + * add `slab`-style `VacantEntry` API ([6776590a](https://github.com/hawkw/sharded-slab/commit/6776590adeda7bf4a117fb233fc09cfa64d77ced), closes [#16](https://github.com/hawkw/sharded-slab/issues/16)) + +#### Performance + +* allocate shard metadata lazily (#45) ([e543a06d](https://github.com/hawkw/sharded-slab/commit/e543a06d7474b3ff92df2cdb4a4571032135ff8d)) + + + +<a name="0.0.9"></a> +### 0.0.9 (2020-04-03) + + +#### Features + +* **Config:** validate concurrent refs ([9b32af58](9b32af58), closes [#21](21)) +* **Pool:** + * add `fmt::Debug` impl for `Pool` ([ffa5c7a0](ffa5c7a0)) + * add `Default` impl for `Pool` ([d2399365](d2399365)) + * add a sharded object pool for reusing heap allocations (#19) ([89734508](89734508), closes [#2](2), [#15](15)) +* **Slab::take:** add exponential backoff when spinning ([6b743a27](6b743a27)) + +#### Bug Fixes + +* incorrect wrapping when overflowing maximum ref count ([aea693f3](aea693f3), closes [#22](22)) + + + +<a name="0.0.8"></a> +### 0.0.8 (2020-01-31) + + +#### Bug Fixes + +* `remove` not adding slots to free lists ([dfdd7aee](dfdd7aee)) + + + +<a name="0.0.7"></a> +### 0.0.7 (2019-12-06) + + +#### Bug Fixes + +* **Config:** compensate for 0 being a valid TID ([b601f5d9](b601f5d9)) +* **DefaultConfig:** + * const overflow on 32-bit ([74d42dd1](74d42dd1), closes [#10](10)) + * wasted bit patterns on 64-bit ([8cf33f66](8cf33f66)) + + + +<a name="0.0.6"></a> +## 0.0.6 (2019-11-08) + + +#### Features + +* **Guard:** expose `key` method #8 ([748bf39b](748bf39b)) + + + +<a name="0.0.5"></a> +## 0.0.5 (2019-10-31) + + +#### Performance + +* consolidate per-slot state into one AtomicUsize (#6) ([f1146d33](f1146d33)) + +#### Features + +* add Default impl for Slab ([61bb3316](61bb3316)) + + + +<a name="0.0.4"></a> +## 0.0.4 (2019-21-30) + + +#### Features + +* prevent items from being removed while concurrently accessed ([872c81d1](872c81d1)) +* added `Slab::remove` method that marks an item to be removed when the last thread + accessing it finishes ([872c81d1](872c81d1)) + +#### Bug Fixes + +* nicer handling of races in remove ([475d9a06](475d9a06)) + +#### Breaking Changes + +* renamed `Slab::remove` to `Slab::take` ([872c81d1](872c81d1)) +* `Slab::get` now returns a `Guard` type ([872c81d1](872c81d1)) + + +<a name="0.0.3"></a> +## 0.0.3 (2019-07-30) + + +#### Bug Fixes + +* split local/remote to fix false sharing & potential races ([69f95fb0](69f95fb0)) +* set next pointer _before_ head ([cc7a0bf1](cc7a0bf1)) + +#### Breaking Changes + +* removed potentially racy `Slab::len` and `Slab::capacity` methods ([27af7d6c](27af7d6c)) + +<a name="0.0.2"></a> +## 0.0.2 (2019-03-30) + + +#### Bug Fixes + +* fix compilation failure in release mode ([617031da](617031da)) + + +<a name="0.0.1"></a> +## 0.0.1 (2019-02-30) + +- Initial release diff --git a/vendor/sharded-slab/Cargo.toml b/vendor/sharded-slab/Cargo.toml new file mode 100644 index 000000000..210621ade --- /dev/null +++ b/vendor/sharded-slab/Cargo.toml @@ -0,0 +1,51 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +name = "sharded-slab" +version = "0.1.4" +authors = ["Eliza Weisman <eliza@buoyant.io>"] +description = "A lock-free concurrent slab.\n" +homepage = "https://github.com/hawkw/sharded-slab" +documentation = "https://docs.rs/sharded-slab/0.1.4/sharded_slab" +readme = "README.md" +keywords = ["slab", "allocator", "lock-free", "atomic"] +categories = ["memory-management", "data-structures", "concurrency"] +license = "MIT" +repository = "https://github.com/hawkw/sharded-slab" +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[[bench]] +name = "bench" +harness = false +[dependencies.lazy_static] +version = "1" +[dev-dependencies.criterion] +version = "0.3" + +[dev-dependencies.loom] +version = "0.5" +features = ["checkpoint"] + +[dev-dependencies.proptest] +version = "1" + +[dev-dependencies.slab] +version = "0.4.2" +[target."cfg(loom)".dependencies.loom] +version = "0.5" +features = ["checkpoint"] +optional = true +[badges.maintenance] +status = "experimental" diff --git a/vendor/sharded-slab/IMPLEMENTATION.md b/vendor/sharded-slab/IMPLEMENTATION.md new file mode 100644 index 000000000..5494302cb --- /dev/null +++ b/vendor/sharded-slab/IMPLEMENTATION.md @@ -0,0 +1,135 @@ +Notes on `sharded-slab`'s implementation and design. + +# Design + +The sharded slab's design is strongly inspired by the ideas presented by +Leijen, Zorn, and de Moura in [Mimalloc: Free List Sharding in +Action][mimalloc]. In this report, the authors present a novel design for a +memory allocator based on a concept of _free list sharding_. + +Memory allocators must keep track of what memory regions are not currently +allocated ("free") in order to provide them to future allocation requests. +The term [_free list_][freelist] refers to a technique for performing this +bookkeeping, where each free block stores a pointer to the next free block, +forming a linked list. The memory allocator keeps a pointer to the most +recently freed block, the _head_ of the free list. To allocate more memory, +the allocator pops from the free list by setting the head pointer to the +next free block of the current head block, and returning the previous head. +To deallocate a block, the block is pushed to the free list by setting its +first word to the current head pointer, and the head pointer is set to point +to the deallocated block. Most implementations of slab allocators backed by +arrays or vectors use a similar technique, where pointers are replaced by +indices into the backing array. + +When allocations and deallocations can occur concurrently across threads, +they must synchronize accesses to the free list; either by putting the +entire allocator state inside of a lock, or by using atomic operations to +treat the free list as a lock-free structure (such as a [Treiber stack]). In +both cases, there is a significant performance cost — even when the free +list is lock-free, it is likely that a noticeable amount of time will be +spent in compare-and-swap loops. Ideally, the global synchronzation point +created by the single global free list could be avoided as much as possible. + +The approach presented by Leijen, Zorn, and de Moura is to introduce +sharding and thus increase the granularity of synchronization significantly. +In mimalloc, the heap is _sharded_ so that each thread has its own +thread-local heap. Objects are always allocated from the local heap of the +thread where the allocation is performed. Because allocations are always +done from a thread's local heap, they need not be synchronized. + +However, since objects can move between threads before being deallocated, +_deallocations_ may still occur concurrently. Therefore, Leijen et al. +introduce a concept of _local_ and _global_ free lists. When an object is +deallocated on the same thread it was originally allocated on, it is placed +on the local free list; if it is deallocated on another thread, it goes on +the global free list for the heap of the thread from which it originated. To +allocate, the local free list is used first; if it is empty, the entire +global free list is popped onto the local free list. Since the local free +list is only ever accessed by the thread it belongs to, it does not require +synchronization at all, and because the global free list is popped from +infrequently, the cost of synchronization has a reduced impact. A majority +of allocations can occur without any synchronization at all; and +deallocations only require synchronization when an object has left its +parent thread (a relatively uncommon case). + +[mimalloc]: https://www.microsoft.com/en-us/research/uploads/prod/2019/06/mimalloc-tr-v1.pdf +[freelist]: https://en.wikipedia.org/wiki/Free_list +[Treiber stack]: https://en.wikipedia.org/wiki/Treiber_stack + +# Implementation + +A slab is represented as an array of [`MAX_THREADS`] _shards_. A shard +consists of a vector of one or more _pages_ plus associated metadata. +Finally, a page consists of an array of _slots_, head indices for the local +and remote free lists. + +```text +┌─────────────┐ +│ shard 1 │ +│ │ ┌─────────────┐ ┌────────┐ +│ pages───────┼───▶│ page 1 │ │ │ +├─────────────┤ ├─────────────┤ ┌────▶│ next──┼─┐ +│ shard 2 │ │ page 2 │ │ ├────────┤ │ +├─────────────┤ │ │ │ │XXXXXXXX│ │ +│ shard 3 │ │ local_head──┼──┘ ├────────┤ │ +└─────────────┘ │ remote_head─┼──┐ │ │◀┘ + ... ├─────────────┤ │ │ next──┼─┐ +┌─────────────┐ │ page 3 │ │ ├────────┤ │ +│ shard n │ └─────────────┘ │ │XXXXXXXX│ │ +└─────────────┘ ... │ ├────────┤ │ + ┌─────────────┐ │ │XXXXXXXX│ │ + │ page n │ │ ├────────┤ │ + └─────────────┘ │ │ │◀┘ + └────▶│ next──┼───▶ ... + ├────────┤ + │XXXXXXXX│ + └────────┘ +``` + + +The size of the first page in a shard is always a power of two, and every +subsequent page added after the first is twice as large as the page that +preceeds it. + +```text + +pg. +┌───┐ ┌─┬─┐ +│ 0 │───▶ │ │ +├───┤ ├─┼─┼─┬─┐ +│ 1 │───▶ │ │ │ │ +├───┤ ├─┼─┼─┼─┼─┬─┬─┬─┐ +│ 2 │───▶ │ │ │ │ │ │ │ │ +├───┤ ├─┼─┼─┼─┼─┼─┼─┼─┼─┬─┬─┬─┬─┬─┬─┬─┐ +│ 3 │───▶ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ +└───┘ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘ +``` + +When searching for a free slot, the smallest page is searched first, and if +it is full, the search proceeds to the next page until either a free slot is +found or all available pages have been searched. If all available pages have +been searched and the maximum number of pages has not yet been reached, a +new page is then allocated. + +Since every page is twice as large as the previous page, and all page sizes +are powers of two, we can determine the page index that contains a given +address by shifting the address down by the smallest page size and +looking at how many twos places necessary to represent that number, +telling us what power of two page size it fits inside of. We can +determine the number of twos places by counting the number of leading +zeros (unused twos places) in the number's binary representation, and +subtracting that count from the total number of bits in a word. + +The formula for determining the page number that contains an offset is thus: + +```rust,ignore +WIDTH - ((offset + INITIAL_PAGE_SIZE) >> INDEX_SHIFT).leading_zeros() +``` + +where `WIDTH` is the number of bits in a `usize`, and `INDEX_SHIFT` is + +```rust,ignore +INITIAL_PAGE_SIZE.trailing_zeros() + 1; +``` + +[`MAX_THREADS`]: https://docs.rs/sharded-slab/latest/sharded_slab/trait.Config.html#associatedconstant.MAX_THREADS diff --git a/vendor/sharded-slab/LICENSE b/vendor/sharded-slab/LICENSE new file mode 100644 index 000000000..254e8565e --- /dev/null +++ b/vendor/sharded-slab/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2019 Eliza Weisman + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/sharded-slab/README.md b/vendor/sharded-slab/README.md new file mode 100644 index 000000000..ea4be64ea --- /dev/null +++ b/vendor/sharded-slab/README.md @@ -0,0 +1,218 @@ +# sharded-slab + +A lock-free concurrent slab. + +[![Crates.io][crates-badge]][crates-url] +[![Documentation][docs-badge]][docs-url] +[![CI Status][ci-badge]][ci-url] +[![GitHub License][license-badge]][license] +![maintenance status][maint-badge] + +[crates-badge]: https://img.shields.io/crates/v/sharded-slab.svg +[crates-url]: https://crates.io/crates/sharded-slab +[docs-badge]: https://docs.rs/sharded-slab/badge.svg +[docs-url]: https://docs.rs/sharded-slab/0.1.4/sharded_slab +[ci-badge]: https://github.com/hawkw/sharded-slab/workflows/CI/badge.svg +[ci-url]: https://github.com/hawkw/sharded-slab/actions?workflow=CI +[license-badge]: https://img.shields.io/crates/l/sharded-slab +[license]: LICENSE +[maint-badge]: https://img.shields.io/badge/maintenance-experimental-blue.svg + +Slabs provide pre-allocated storage for many instances of a single data +type. When a large number of values of a single type are required, +this can be more efficient than allocating each item individually. Since the +allocated items are the same size, memory fragmentation is reduced, and +creating and removing new items can be very cheap. + +This crate implements a lock-free concurrent slab, indexed by `usize`s. + +**Note**: This crate is currently experimental. Please feel free to use it in +your projects, but bear in mind that there's still plenty of room for +optimization, and there may still be some lurking bugs. + +## Usage + +First, add this to your `Cargo.toml`: + +```toml +sharded-slab = "0.1.1" +``` + +This crate provides two types, [`Slab`] and [`Pool`], which provide slightly +different APIs for using a sharded slab. + +[`Slab`] implements a slab for _storing_ small types, sharing them between +threads, and accessing them by index. New entries are allocated by [inserting] +data, moving it in by value. Similarly, entries may be deallocated by [taking] +from the slab, moving the value out. This API is similar to a `Vec<Option<T>>`, +but allowing lock-free concurrent insertion and removal. + +In contrast, the [`Pool`] type provides an [object pool] style API for +_reusing storage_. Rather than constructing values and moving them into +the pool, as with [`Slab`], [allocating an entry][create] from the pool +takes a closure that's provided with a mutable reference to initialize +the entry in place. When entries are deallocated, they are [cleared] in +place. Types which own a heap allocation can be cleared by dropping any +_data_ they store, but retaining any previously-allocated capacity. This +means that a [`Pool`] may be used to reuse a set of existing heap +allocations, reducing allocator load. + +[`Slab`]: https://docs.rs/sharded-slab/0.1.4/sharded_slab/struct.Slab.html +[inserting]: https://docs.rs/sharded-slab/0.1.4/sharded_slab/struct.Slab.html#method.insert +[taking]: https://docs.rs/sharded-slab/0.1.4/sharded_slab/struct.Slab.html#method.take +[`Pool`]: https://docs.rs/sharded-slab/0.1.4/sharded_slab/struct.Pool.html +[create]: https://docs.rs/sharded-slab/0.1.4/sharded_slab/struct.Pool.html#method.create +[cleared]: https://docs.rs/sharded-slab/0.1.4/sharded_slab/trait.Clear.html +[object pool]: https://en.wikipedia.org/wiki/Object_pool_pattern + +### Examples + +Inserting an item into the slab, returning an index: + +```rust +use sharded_slab::Slab; +let slab = Slab::new(); + +let key = slab.insert("hello world").unwrap(); +assert_eq!(slab.get(key).unwrap(), "hello world"); +``` + +To share a slab across threads, it may be wrapped in an `Arc`: + +```rust +use sharded_slab::Slab; +use std::sync::Arc; +let slab = Arc::new(Slab::new()); + +let slab2 = slab.clone(); +let thread2 = std::thread::spawn(move || { + let key = slab2.insert("hello from thread two").unwrap(); + assert_eq!(slab2.get(key).unwrap(), "hello from thread two"); + key +}); + +let key1 = slab.insert("hello from thread one").unwrap(); +assert_eq!(slab.get(key1).unwrap(), "hello from thread one"); + +// Wait for thread 2 to complete. +let key2 = thread2.join().unwrap(); + +// The item inserted by thread 2 remains in the slab. +assert_eq!(slab.get(key2).unwrap(), "hello from thread two"); +``` + +If items in the slab must be mutated, a `Mutex` or `RwLock` may be used for +each item, providing granular locking of items rather than of the slab: + +```rust +use sharded_slab::Slab; +use std::sync::{Arc, Mutex}; +let slab = Arc::new(Slab::new()); + +let key = slab.insert(Mutex::new(String::from("hello world"))).unwrap(); + +let slab2 = slab.clone(); +let thread2 = std::thread::spawn(move || { + let hello = slab2.get(key).expect("item missing"); + let mut hello = hello.lock().expect("mutex poisoned"); + *hello = String::from("hello everyone!"); +}); + +thread2.join().unwrap(); + +let hello = slab.get(key).expect("item missing"); +let mut hello = hello.lock().expect("mutex poisoned"); +assert_eq!(hello.as_str(), "hello everyone!"); +``` + +## Comparison with Similar Crates + +- [`slab`]: Carl Lerche's `slab` crate provides a slab implementation with a + similar API, implemented by storing all data in a single vector. + + Unlike `sharded-slab`, inserting and removing elements from the slab requires + mutable access. This means that if the slab is accessed concurrently by + multiple threads, it is necessary for it to be protected by a `Mutex` or + `RwLock`. Items may not be inserted or removed (or accessed, if a `Mutex` is + used) concurrently, even when they are unrelated. In many cases, the lock can + become a significant bottleneck. On the other hand, `sharded-slab` allows + separate indices in the slab to be accessed, inserted, and removed + concurrently without requiring a global lock. Therefore, when the slab is + shared across multiple threads, this crate offers significantly better + performance than `slab`. + + However, the lock free slab introduces some additional constant-factor + overhead. This means that in use-cases where a slab is _not_ shared by + multiple threads and locking is not required, `sharded-slab` will likely + offer slightly worse performance. + + In summary: `sharded-slab` offers significantly improved performance in + concurrent use-cases, while `slab` should be preferred in single-threaded + use-cases. + +[`slab`]: https://crates.io/crates/slab + +## Safety and Correctness + +Most implementations of lock-free data structures in Rust require some +amount of unsafe code, and this crate is not an exception. In order to catch +potential bugs in this unsafe code, we make use of [`loom`], a +permutation-testing tool for concurrent Rust programs. All `unsafe` blocks +this crate occur in accesses to `loom` `UnsafeCell`s. This means that when +those accesses occur in this crate's tests, `loom` will assert that they are +valid under the C11 memory model across multiple permutations of concurrent +executions of those tests. + +In order to guard against the [ABA problem][aba], this crate makes use of +_generational indices_. Each slot in the slab tracks a generation counter +which is incremented every time a value is inserted into that slot, and the +indices returned by `Slab::insert` include the generation of the slot when +the value was inserted, packed into the high-order bits of the index. This +ensures that if a value is inserted, removed, and a new value is inserted +into the same slot in the slab, the key returned by the first call to +`insert` will not map to the new value. + +Since a fixed number of bits are set aside to use for storing the generation +counter, the counter will wrap around after being incremented a number of +times. To avoid situations where a returned index lives long enough to see the +generation counter wrap around to the same value, it is good to be fairly +generous when configuring the allocation of index bits. + +[`loom`]: https://crates.io/crates/loom +[aba]: https://en.wikipedia.org/wiki/ABA_problem + +## Performance + +These graphs were produced by [benchmarks] of the sharded slab implementation, +using the [`criterion`] crate. + +The first shows the results of a benchmark where an increasing number of +items are inserted and then removed into a slab concurrently by five +threads. It compares the performance of the sharded slab implementation +with a `RwLock<slab::Slab>`: + +<img width="1124" alt="Screen Shot 2019-10-01 at 5 09 49 PM" src="https://user-images.githubusercontent.com/2796466/66078398-cd6c9f80-e516-11e9-9923-0ed6292e8498.png"> + +The second graph shows the results of a benchmark where an increasing +number of items are inserted and then removed by a _single_ thread. It +compares the performance of the sharded slab implementation with an +`RwLock<slab::Slab>` and a `mut slab::Slab`. + +<img width="925" alt="Screen Shot 2019-10-01 at 5 13 45 PM" src="https://user-images.githubusercontent.com/2796466/66078469-f0974f00-e516-11e9-95b5-f65f0aa7e494.png"> + +These benchmarks demonstrate that, while the sharded approach introduces +a small constant-factor overhead, it offers significantly better +performance across concurrent accesses. + +[benchmarks]: https://github.com/hawkw/sharded-slab/blob/master/benches/bench.rs +[`criterion`]: https://crates.io/crates/criterion + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in this project by you, shall be licensed as MIT, without any +additional terms or conditions. diff --git a/vendor/sharded-slab/benches/bench.rs b/vendor/sharded-slab/benches/bench.rs new file mode 100644 index 000000000..c95bd4e38 --- /dev/null +++ b/vendor/sharded-slab/benches/bench.rs @@ -0,0 +1,181 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use std::{ + sync::{Arc, Barrier, RwLock}, + thread, + time::{Duration, Instant}, +}; + +#[derive(Clone)] +struct MultithreadedBench<T> { + start: Arc<Barrier>, + end: Arc<Barrier>, + slab: Arc<T>, +} + +impl<T: Send + Sync + 'static> MultithreadedBench<T> { + fn new(slab: Arc<T>) -> Self { + Self { + start: Arc::new(Barrier::new(5)), + end: Arc::new(Barrier::new(5)), + slab, + } + } + + fn thread(&self, f: impl FnOnce(&Barrier, &T) + Send + 'static) -> &Self { + let start = self.start.clone(); + let end = self.end.clone(); + let slab = self.slab.clone(); + thread::spawn(move || { + f(&*start, &*slab); + end.wait(); + }); + self + } + + fn run(&self) -> Duration { + self.start.wait(); + let t0 = Instant::now(); + self.end.wait(); + t0.elapsed() + } +} + +const N_INSERTIONS: &[usize] = &[100, 300, 500, 700, 1000, 3000, 5000]; + +fn insert_remove_local(c: &mut Criterion) { + // the 10000-insertion benchmark takes the `slab` crate about an hour to + // run; don't run this unless you're prepared for that... + // const N_INSERTIONS: &'static [usize] = &[100, 500, 1000, 5000, 10000]; + let mut group = c.benchmark_group("insert_remove_local"); + let g = group.measurement_time(Duration::from_secs(15)); + + for i in N_INSERTIONS { + g.bench_with_input(BenchmarkId::new("sharded_slab", i), i, |b, &i| { + b.iter_custom(|iters| { + let mut total = Duration::from_secs(0); + for _ in 0..iters { + let bench = MultithreadedBench::new(Arc::new(sharded_slab::Slab::new())); + let elapsed = bench + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = (0..i).map(|i| slab.insert(i).unwrap()).collect(); + for i in v { + slab.remove(i); + } + }) + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = (0..i).map(|i| slab.insert(i).unwrap()).collect(); + for i in v { + slab.remove(i); + } + }) + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = (0..i).map(|i| slab.insert(i).unwrap()).collect(); + for i in v { + slab.remove(i); + } + }) + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = (0..i).map(|i| slab.insert(i).unwrap()).collect(); + for i in v { + slab.remove(i); + } + }) + .run(); + total += elapsed; + } + total + }) + }); + g.bench_with_input(BenchmarkId::new("slab_biglock", i), i, |b, &i| { + b.iter_custom(|iters| { + let mut total = Duration::from_secs(0); + let i = i; + for _ in 0..iters { + let bench = MultithreadedBench::new(Arc::new(RwLock::new(slab::Slab::new()))); + let elapsed = bench + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = + (0..i).map(|i| slab.write().unwrap().insert(i)).collect(); + for i in v { + slab.write().unwrap().remove(i); + } + }) + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = + (0..i).map(|i| slab.write().unwrap().insert(i)).collect(); + for i in v { + slab.write().unwrap().remove(i); + } + }) + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = + (0..i).map(|i| slab.write().unwrap().insert(i)).collect(); + for i in v { + slab.write().unwrap().remove(i); + } + }) + .thread(move |start, slab| { + start.wait(); + let v: Vec<_> = + (0..i).map(|i| slab.write().unwrap().insert(i)).collect(); + for i in v { + slab.write().unwrap().remove(i); + } + }) + .run(); + total += elapsed; + } + total + }) + }); + } + group.finish(); +} + +fn insert_remove_single_thread(c: &mut Criterion) { + // the 10000-insertion benchmark takes the `slab` crate about an hour to + // run; don't run this unless you're prepared for that... + // const N_INSERTIONS: &'static [usize] = &[100, 500, 1000, 5000, 10000]; + let mut group = c.benchmark_group("insert_remove_single_threaded"); + + for i in N_INSERTIONS { + group.bench_with_input(BenchmarkId::new("sharded_slab", i), i, |b, &i| { + let slab = sharded_slab::Slab::new(); + b.iter(|| { + let v: Vec<_> = (0..i).map(|i| slab.insert(i).unwrap()).collect(); + for i in v { + slab.remove(i); + } + }); + }); + group.bench_with_input(BenchmarkId::new("slab_no_lock", i), i, |b, &i| { + let mut slab = slab::Slab::new(); + b.iter(|| { + let v: Vec<_> = (0..i).map(|i| slab.insert(i)).collect(); + for i in v { + slab.remove(i); + } + }); + }); + group.bench_with_input(BenchmarkId::new("slab_uncontended", i), i, |b, &i| { + let slab = RwLock::new(slab::Slab::new()); + b.iter(|| { + let v: Vec<_> = (0..i).map(|i| slab.write().unwrap().insert(i)).collect(); + for i in v { + slab.write().unwrap().remove(i); + } + }); + }); + } + group.finish(); +} + +criterion_group!(benches, insert_remove_local, insert_remove_single_thread); +criterion_main!(benches); diff --git a/vendor/sharded-slab/bin/loom.sh b/vendor/sharded-slab/bin/loom.sh new file mode 100755 index 000000000..244eebd9c --- /dev/null +++ b/vendor/sharded-slab/bin/loom.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# Runs Loom tests with defaults for Loom's configuration values. +# +# The tests are compiled in release mode to improve performance, but debug +# assertions are enabled. +# +# Any arguments to this script are passed to the `cargo test` invocation. + +RUSTFLAGS="${RUSTFLAGS} --cfg loom -C debug-assertions=on" \ + LOOM_MAX_PREEMPTIONS="${LOOM_MAX_PREEMPTIONS:-2}" \ + LOOM_CHECKPOINT_INTERVAL="${LOOM_CHECKPOINT_INTERVAL:-1}" \ + LOOM_LOG=1 \ + LOOM_LOCATION=1 \ + cargo test --release --lib "$@" diff --git a/vendor/sharded-slab/src/cfg.rs b/vendor/sharded-slab/src/cfg.rs new file mode 100644 index 000000000..b690ab2bf --- /dev/null +++ b/vendor/sharded-slab/src/cfg.rs @@ -0,0 +1,215 @@ +use crate::page::{ + slot::{Generation, RefCount}, + Addr, +}; +use crate::Pack; +use std::{fmt, marker::PhantomData}; +/// Configuration parameters which can be overridden to tune the behavior of a slab. +pub trait Config: Sized { + /// The maximum number of threads which can access the slab. + /// + /// This value (rounded to a power of two) determines the number of shards + /// in the slab. If a thread is created, accesses the slab, and then terminates, + /// its shard may be reused and thus does not count against the maximum + /// number of threads once the thread has terminated. + const MAX_THREADS: usize = DefaultConfig::MAX_THREADS; + /// The maximum number of pages in each shard in the slab. + /// + /// This value, in combination with `INITIAL_PAGE_SIZE`, determines how many + /// bits of each index are used to represent page addresses. + const MAX_PAGES: usize = DefaultConfig::MAX_PAGES; + /// The size of the first page in each shard. + /// + /// When a page in a shard has been filled with values, a new page + /// will be allocated that is twice as large as the previous page. Thus, the + /// second page will be twice this size, and the third will be four times + /// this size, and so on. + /// + /// Note that page sizes must be powers of two. If this value is not a power + /// of two, it will be rounded to the next power of two. + const INITIAL_PAGE_SIZE: usize = DefaultConfig::INITIAL_PAGE_SIZE; + /// Sets a number of high-order bits in each index which are reserved from + /// user code. + /// + /// Note that these bits are taken from the generation counter; if the page + /// address and thread IDs are configured to use a large number of bits, + /// reserving additional bits will decrease the period of the generation + /// counter. These should thus be used relatively sparingly, to ensure that + /// generation counters are able to effectively prevent the ABA problem. + const RESERVED_BITS: usize = 0; +} + +pub(crate) trait CfgPrivate: Config { + const USED_BITS: usize = Generation::<Self>::LEN + Generation::<Self>::SHIFT; + const INITIAL_SZ: usize = next_pow2(Self::INITIAL_PAGE_SIZE); + const MAX_SHARDS: usize = next_pow2(Self::MAX_THREADS - 1); + const ADDR_INDEX_SHIFT: usize = Self::INITIAL_SZ.trailing_zeros() as usize + 1; + + fn page_size(n: usize) -> usize { + Self::INITIAL_SZ * 2usize.pow(n as _) + } + + fn debug() -> DebugConfig<Self> { + DebugConfig { _cfg: PhantomData } + } + + fn validate() { + assert!( + Self::INITIAL_SZ.is_power_of_two(), + "invalid Config: {:#?}", + Self::debug(), + ); + assert!( + Self::INITIAL_SZ <= Addr::<Self>::BITS, + "invalid Config: {:#?}", + Self::debug() + ); + + assert!( + Generation::<Self>::BITS >= 3, + "invalid Config: {:#?}\ngeneration counter should be at least 3 bits!", + Self::debug() + ); + + assert!( + Self::USED_BITS <= WIDTH, + "invalid Config: {:#?}\ntotal number of bits per index is too large to fit in a word!", + Self::debug() + ); + + assert!( + WIDTH - Self::USED_BITS >= Self::RESERVED_BITS, + "invalid Config: {:#?}\nindices are too large to fit reserved bits!", + Self::debug() + ); + + assert!( + RefCount::<Self>::MAX > 1, + "invalid config: {:#?}\n maximum concurrent references would be {}", + Self::debug(), + RefCount::<Self>::MAX, + ); + } + + #[inline(always)] + fn unpack<A: Pack<Self>>(packed: usize) -> A { + A::from_packed(packed) + } + + #[inline(always)] + fn unpack_addr(packed: usize) -> Addr<Self> { + Self::unpack(packed) + } + + #[inline(always)] + fn unpack_tid(packed: usize) -> crate::Tid<Self> { + Self::unpack(packed) + } + + #[inline(always)] + fn unpack_gen(packed: usize) -> Generation<Self> { + Self::unpack(packed) + } +} +impl<C: Config> CfgPrivate for C {} + +/// Default slab configuration values. +#[derive(Copy, Clone)] +pub struct DefaultConfig { + _p: (), +} + +pub(crate) struct DebugConfig<C: Config> { + _cfg: PhantomData<fn(C)>, +} + +pub(crate) const WIDTH: usize = std::mem::size_of::<usize>() * 8; + +pub(crate) const fn next_pow2(n: usize) -> usize { + let pow2 = n.count_ones() == 1; + let zeros = n.leading_zeros(); + 1 << (WIDTH - zeros as usize - pow2 as usize) +} + +// === impl DefaultConfig === + +impl Config for DefaultConfig { + const INITIAL_PAGE_SIZE: usize = 32; + + #[cfg(target_pointer_width = "64")] + const MAX_THREADS: usize = 4096; + #[cfg(target_pointer_width = "32")] + // TODO(eliza): can we find enough bits to give 32-bit platforms more threads? + const MAX_THREADS: usize = 128; + + const MAX_PAGES: usize = WIDTH / 2; +} + +impl fmt::Debug for DefaultConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Self::debug().fmt(f) + } +} + +impl<C: Config> fmt::Debug for DebugConfig<C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(std::any::type_name::<C>()) + .field("initial_page_size", &C::INITIAL_SZ) + .field("max_shards", &C::MAX_SHARDS) + .field("max_pages", &C::MAX_PAGES) + .field("used_bits", &C::USED_BITS) + .field("reserved_bits", &C::RESERVED_BITS) + .field("pointer_width", &WIDTH) + .field("max_concurrent_references", &RefCount::<C>::MAX) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util; + use crate::Slab; + + #[test] + #[cfg_attr(loom, ignore)] + #[should_panic] + fn validates_max_refs() { + struct GiantGenConfig; + + // Configure the slab with a very large number of bits for the generation + // counter. This will only leave 1 bit to use for the slot reference + // counter, which will fail to validate. + impl Config for GiantGenConfig { + const INITIAL_PAGE_SIZE: usize = 1; + const MAX_THREADS: usize = 1; + const MAX_PAGES: usize = 1; + } + + let _slab = Slab::<usize>::new_with_config::<GiantGenConfig>(); + } + + #[test] + #[cfg_attr(loom, ignore)] + fn big() { + let slab = Slab::new(); + + for i in 0..10000 { + println!("{:?}", i); + let k = slab.insert(i).expect("insert"); + assert_eq!(slab.get(k).expect("get"), i); + } + } + + #[test] + #[cfg_attr(loom, ignore)] + fn custom_page_sz() { + let slab = Slab::new_with_config::<test_util::TinyConfig>(); + + for i in 0..4096 { + println!("{}", i); + let k = slab.insert(i).expect("insert"); + assert_eq!(slab.get(k).expect("get"), i); + } + } +} diff --git a/vendor/sharded-slab/src/clear.rs b/vendor/sharded-slab/src/clear.rs new file mode 100644 index 000000000..1eb88b460 --- /dev/null +++ b/vendor/sharded-slab/src/clear.rs @@ -0,0 +1,100 @@ +use std::{collections, hash, ops::DerefMut, sync}; + +/// Trait implemented by types which can be cleared in place, retaining any +/// allocated memory. +/// +/// This is essentially a generalization of methods on standard library +/// collection types, including as [`Vec::clear`], [`String::clear`], and +/// [`HashMap::clear`]. These methods drop all data stored in the collection, +/// but retain the collection's heap allocation for future use. Types such as +/// `BTreeMap`, whose `clear` methods drops allocations, should not +/// implement this trait. +/// +/// When implemented for types which do not own a heap allocation, `Clear` +/// should reset the type in place if possible. If the type has an empty state +/// or stores `Option`s, those values should be reset to the empty state. For +/// "plain old data" types, which hold no pointers to other data and do not have +/// an empty or initial state, it's okay for a `Clear` implementation to be a +/// no-op. In that case, it essentially serves as a marker indicating that the +/// type may be reused to store new data. +/// +/// [`Vec::clear`]: https://doc.rust-lang.org/stable/std/vec/struct.Vec.html#method.clear +/// [`String::clear`]: https://doc.rust-lang.org/stable/std/string/struct.String.html#method.clear +/// [`HashMap::clear`]: https://doc.rust-lang.org/stable/std/collections/struct.HashMap.html#method.clear +pub trait Clear { + /// Clear all data in `self`, retaining the allocated capacithy. + fn clear(&mut self); +} + +impl<T> Clear for Option<T> { + fn clear(&mut self) { + let _ = self.take(); + } +} + +impl<T> Clear for Box<T> +where + T: Clear, +{ + #[inline] + fn clear(&mut self) { + self.deref_mut().clear() + } +} + +impl<T> Clear for Vec<T> { + #[inline] + fn clear(&mut self) { + Vec::clear(self) + } +} + +impl<K, V, S> Clear for collections::HashMap<K, V, S> +where + K: hash::Hash + Eq, + S: hash::BuildHasher, +{ + #[inline] + fn clear(&mut self) { + collections::HashMap::clear(self) + } +} + +impl<T, S> Clear for collections::HashSet<T, S> +where + T: hash::Hash + Eq, + S: hash::BuildHasher, +{ + #[inline] + fn clear(&mut self) { + collections::HashSet::clear(self) + } +} + +impl Clear for String { + #[inline] + fn clear(&mut self) { + String::clear(self) + } +} + +impl<T: Clear> Clear for sync::Mutex<T> { + #[inline] + fn clear(&mut self) { + self.get_mut().unwrap().clear(); + } +} + +impl<T: Clear> Clear for sync::RwLock<T> { + #[inline] + fn clear(&mut self) { + self.write().unwrap().clear(); + } +} + +#[cfg(all(loom, test))] +impl<T: Clear> Clear for crate::sync::alloc::Track<T> { + fn clear(&mut self) { + self.get_mut().clear() + } +} diff --git a/vendor/sharded-slab/src/implementation.rs b/vendor/sharded-slab/src/implementation.rs new file mode 100644 index 000000000..01f08a582 --- /dev/null +++ b/vendor/sharded-slab/src/implementation.rs @@ -0,0 +1,138 @@ +// This module exists only to provide a separate page for the implementation +// documentation. + +//! Notes on `sharded-slab`'s implementation and design. +//! +//! # Design +//! +//! The sharded slab's design is strongly inspired by the ideas presented by +//! Leijen, Zorn, and de Moura in [Mimalloc: Free List Sharding in +//! Action][mimalloc]. In this report, the authors present a novel design for a +//! memory allocator based on a concept of _free list sharding_. +//! +//! Memory allocators must keep track of what memory regions are not currently +//! allocated ("free") in order to provide them to future allocation requests. +//! The term [_free list_][freelist] refers to a technique for performing this +//! bookkeeping, where each free block stores a pointer to the next free block, +//! forming a linked list. The memory allocator keeps a pointer to the most +//! recently freed block, the _head_ of the free list. To allocate more memory, +//! the allocator pops from the free list by setting the head pointer to the +//! next free block of the current head block, and returning the previous head. +//! To deallocate a block, the block is pushed to the free list by setting its +//! first word to the current head pointer, and the head pointer is set to point +//! to the deallocated block. Most implementations of slab allocators backed by +//! arrays or vectors use a similar technique, where pointers are replaced by +//! indices into the backing array. +//! +//! When allocations and deallocations can occur concurrently across threads, +//! they must synchronize accesses to the free list; either by putting the +//! entire allocator state inside of a lock, or by using atomic operations to +//! treat the free list as a lock-free structure (such as a [Treiber stack]). In +//! both cases, there is a significant performance cost — even when the free +//! list is lock-free, it is likely that a noticeable amount of time will be +//! spent in compare-and-swap loops. Ideally, the global synchronzation point +//! created by the single global free list could be avoided as much as possible. +//! +//! The approach presented by Leijen, Zorn, and de Moura is to introduce +//! sharding and thus increase the granularity of synchronization significantly. +//! In mimalloc, the heap is _sharded_ so that each thread has its own +//! thread-local heap. Objects are always allocated from the local heap of the +//! thread where the allocation is performed. Because allocations are always +//! done from a thread's local heap, they need not be synchronized. +//! +//! However, since objects can move between threads before being deallocated, +//! _deallocations_ may still occur concurrently. Therefore, Leijen et al. +//! introduce a concept of _local_ and _global_ free lists. When an object is +//! deallocated on the same thread it was originally allocated on, it is placed +//! on the local free list; if it is deallocated on another thread, it goes on +//! the global free list for the heap of the thread from which it originated. To +//! allocate, the local free list is used first; if it is empty, the entire +//! global free list is popped onto the local free list. Since the local free +//! list is only ever accessed by the thread it belongs to, it does not require +//! synchronization at all, and because the global free list is popped from +//! infrequently, the cost of synchronization has a reduced impact. A majority +//! of allocations can occur without any synchronization at all; and +//! deallocations only require synchronization when an object has left its +//! parent thread (a relatively uncommon case). +//! +//! [mimalloc]: https://www.microsoft.com/en-us/research/uploads/prod/2019/06/mimalloc-tr-v1.pdf +//! [freelist]: https://en.wikipedia.org/wiki/Free_list +//! [Treiber stack]: https://en.wikipedia.org/wiki/Treiber_stack +//! +//! # Implementation +//! +//! A slab is represented as an array of [`MAX_THREADS`] _shards_. A shard +//! consists of a vector of one or more _pages_ plus associated metadata. +//! Finally, a page consists of an array of _slots_, head indices for the local +//! and remote free lists. +//! +//! ```text +//! ┌─────────────┐ +//! │ shard 1 │ +//! │ │ ┌─────────────┐ ┌────────┐ +//! │ pages───────┼───▶│ page 1 │ │ │ +//! ├─────────────┤ ├─────────────┤ ┌────▶│ next──┼─┐ +//! │ shard 2 │ │ page 2 │ │ ├────────┤ │ +//! ├─────────────┤ │ │ │ │XXXXXXXX│ │ +//! │ shard 3 │ │ local_head──┼──┘ ├────────┤ │ +//! └─────────────┘ │ remote_head─┼──┐ │ │◀┘ +//! ... ├─────────────┤ │ │ next──┼─┐ +//! ┌─────────────┐ │ page 3 │ │ ├────────┤ │ +//! │ shard n │ └─────────────┘ │ │XXXXXXXX│ │ +//! └─────────────┘ ... │ ├────────┤ │ +//! ┌─────────────┐ │ │XXXXXXXX│ │ +//! │ page n │ │ ├────────┤ │ +//! └─────────────┘ │ │ │◀┘ +//! └────▶│ next──┼───▶ ... +//! ├────────┤ +//! │XXXXXXXX│ +//! └────────┘ +//! ``` +//! +//! +//! The size of the first page in a shard is always a power of two, and every +//! subsequent page added after the first is twice as large as the page that +//! preceeds it. +//! +//! ```text +//! +//! pg. +//! ┌───┐ ┌─┬─┐ +//! │ 0 │───▶ │ │ +//! ├───┤ ├─┼─┼─┬─┐ +//! │ 1 │───▶ │ │ │ │ +//! ├───┤ ├─┼─┼─┼─┼─┬─┬─┬─┐ +//! │ 2 │───▶ │ │ │ │ │ │ │ │ +//! ├───┤ ├─┼─┼─┼─┼─┼─┼─┼─┼─┬─┬─┬─┬─┬─┬─┬─┐ +//! │ 3 │───▶ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ +//! └───┘ └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘ +//! ``` +//! +//! When searching for a free slot, the smallest page is searched first, and if +//! it is full, the search proceeds to the next page until either a free slot is +//! found or all available pages have been searched. If all available pages have +//! been searched and the maximum number of pages has not yet been reached, a +//! new page is then allocated. +//! +//! Since every page is twice as large as the previous page, and all page sizes +//! are powers of two, we can determine the page index that contains a given +//! address by shifting the address down by the smallest page size and +//! looking at how many twos places necessary to represent that number, +//! telling us what power of two page size it fits inside of. We can +//! determine the number of twos places by counting the number of leading +//! zeros (unused twos places) in the number's binary representation, and +//! subtracting that count from the total number of bits in a word. +//! +//! The formula for determining the page number that contains an offset is thus: +//! +//! ```rust,ignore +//! WIDTH - ((offset + INITIAL_PAGE_SIZE) >> INDEX_SHIFT).leading_zeros() +//! ``` +//! +//! where `WIDTH` is the number of bits in a `usize`, and `INDEX_SHIFT` is +//! +//! ```rust,ignore +//! INITIAL_PAGE_SIZE.trailing_zeros() + 1; +//! ``` +//! +//! [`MAX_THREADS`]: https://docs.rs/sharded-slab/latest/sharded_slab/trait.Config.html#associatedconstant.MAX_THREADS diff --git a/vendor/sharded-slab/src/iter.rs b/vendor/sharded-slab/src/iter.rs new file mode 100644 index 000000000..54189aa57 --- /dev/null +++ b/vendor/sharded-slab/src/iter.rs @@ -0,0 +1,39 @@ +use crate::{page, shard}; +use std::slice; + +#[derive(Debug)] +pub struct UniqueIter<'a, T, C: crate::cfg::Config> { + pub(super) shards: shard::IterMut<'a, Option<T>, C>, + pub(super) pages: slice::Iter<'a, page::Shared<Option<T>, C>>, + pub(super) slots: Option<page::Iter<'a, T, C>>, +} + +impl<'a, T, C: crate::cfg::Config> Iterator for UniqueIter<'a, T, C> { + type Item = &'a T; + fn next(&mut self) -> Option<Self::Item> { + test_println!("UniqueIter::next"); + loop { + test_println!("-> try next slot"); + if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) { + test_println!("-> found an item!"); + return Some(item); + } + + test_println!("-> try next page"); + if let Some(page) = self.pages.next() { + test_println!("-> found another page"); + self.slots = page.iter(); + continue; + } + + test_println!("-> try next shard"); + if let Some(shard) = self.shards.next() { + test_println!("-> found another shard"); + self.pages = shard.iter(); + } else { + test_println!("-> all done!"); + return None; + } + } + } +} diff --git a/vendor/sharded-slab/src/lib.rs b/vendor/sharded-slab/src/lib.rs new file mode 100644 index 000000000..e57cf50e2 --- /dev/null +++ b/vendor/sharded-slab/src/lib.rs @@ -0,0 +1,1092 @@ +//! A lock-free concurrent slab. +//! +//! Slabs provide pre-allocated storage for many instances of a single data +//! type. When a large number of values of a single type are required, +//! this can be more efficient than allocating each item individually. Since the +//! allocated items are the same size, memory fragmentation is reduced, and +//! creating and removing new items can be very cheap. +//! +//! This crate implements a lock-free concurrent slab, indexed by `usize`s. +//! +//! ## Usage +//! +//! First, add this to your `Cargo.toml`: +//! +//! ```toml +//! sharded-slab = "0.1.1" +//! ``` +//! +//! This crate provides two types, [`Slab`] and [`Pool`], which provide +//! slightly different APIs for using a sharded slab. +//! +//! [`Slab`] implements a slab for _storing_ small types, sharing them between +//! threads, and accessing them by index. New entries are allocated by +//! [inserting] data, moving it in by value. Similarly, entries may be +//! deallocated by [taking] from the slab, moving the value out. This API is +//! similar to a `Vec<Option<T>>`, but allowing lock-free concurrent insertion +//! and removal. +//! +//! In contrast, the [`Pool`] type provides an [object pool] style API for +//! _reusing storage_. Rather than constructing values and moving them into the +//! pool, as with [`Slab`], [allocating an entry][create] from the pool takes a +//! closure that's provided with a mutable reference to initialize the entry in +//! place. When entries are deallocated, they are [cleared] in place. Types +//! which own a heap allocation can be cleared by dropping any _data_ they +//! store, but retaining any previously-allocated capacity. This means that a +//! [`Pool`] may be used to reuse a set of existing heap allocations, reducing +//! allocator load. +//! +//! [inserting]: Slab::insert +//! [taking]: Slab::take +//! [create]: Pool::create +//! [cleared]: Clear +//! [object pool]: https://en.wikipedia.org/wiki/Object_pool_pattern +//! +//! # Examples +//! +//! Inserting an item into the slab, returning an index: +//! ```rust +//! # use sharded_slab::Slab; +//! let slab = Slab::new(); +//! +//! let key = slab.insert("hello world").unwrap(); +//! assert_eq!(slab.get(key).unwrap(), "hello world"); +//! ``` +//! +//! To share a slab across threads, it may be wrapped in an `Arc`: +//! ```rust +//! # use sharded_slab::Slab; +//! use std::sync::Arc; +//! let slab = Arc::new(Slab::new()); +//! +//! let slab2 = slab.clone(); +//! let thread2 = std::thread::spawn(move || { +//! let key = slab2.insert("hello from thread two").unwrap(); +//! assert_eq!(slab2.get(key).unwrap(), "hello from thread two"); +//! key +//! }); +//! +//! let key1 = slab.insert("hello from thread one").unwrap(); +//! assert_eq!(slab.get(key1).unwrap(), "hello from thread one"); +//! +//! // Wait for thread 2 to complete. +//! let key2 = thread2.join().unwrap(); +//! +//! // The item inserted by thread 2 remains in the slab. +//! assert_eq!(slab.get(key2).unwrap(), "hello from thread two"); +//!``` +//! +//! If items in the slab must be mutated, a `Mutex` or `RwLock` may be used for +//! each item, providing granular locking of items rather than of the slab: +//! +//! ```rust +//! # use sharded_slab::Slab; +//! use std::sync::{Arc, Mutex}; +//! let slab = Arc::new(Slab::new()); +//! +//! let key = slab.insert(Mutex::new(String::from("hello world"))).unwrap(); +//! +//! let slab2 = slab.clone(); +//! let thread2 = std::thread::spawn(move || { +//! let hello = slab2.get(key).expect("item missing"); +//! let mut hello = hello.lock().expect("mutex poisoned"); +//! *hello = String::from("hello everyone!"); +//! }); +//! +//! thread2.join().unwrap(); +//! +//! let hello = slab.get(key).expect("item missing"); +//! let mut hello = hello.lock().expect("mutex poisoned"); +//! assert_eq!(hello.as_str(), "hello everyone!"); +//! ``` +//! +//! # Configuration +//! +//! For performance reasons, several values used by the slab are calculated as +//! constants. In order to allow users to tune the slab's parameters, we provide +//! a [`Config`] trait which defines these parameters as associated `consts`. +//! The `Slab` type is generic over a `C: Config` parameter. +//! +//! [`Config`]: trait.Config.html +//! +//! # Comparison with Similar Crates +//! +//! - [`slab`]: Carl Lerche's `slab` crate provides a slab implementation with a +//! similar API, implemented by storing all data in a single vector. +//! +//! Unlike `sharded_slab`, inserting and removing elements from the slab +//! requires mutable access. This means that if the slab is accessed +//! concurrently by multiple threads, it is necessary for it to be protected +//! by a `Mutex` or `RwLock`. Items may not be inserted or removed (or +//! accessed, if a `Mutex` is used) concurrently, even when they are +//! unrelated. In many cases, the lock can become a significant bottleneck. On +//! the other hand, this crate allows separate indices in the slab to be +//! accessed, inserted, and removed concurrently without requiring a global +//! lock. Therefore, when the slab is shared across multiple threads, this +//! crate offers significantly better performance than `slab`. +//! +//! However, the lock free slab introduces some additional constant-factor +//! overhead. This means that in use-cases where a slab is _not_ shared by +//! multiple threads and locking is not required, this crate will likely offer +//! slightly worse performance. +//! +//! In summary: `sharded-slab` offers significantly improved performance in +//! concurrent use-cases, while `slab` should be preferred in single-threaded +//! use-cases. +//! +//! [`slab`]: https://crates.io/crates/loom +//! +//! # Safety and Correctness +//! +//! Most implementations of lock-free data structures in Rust require some +//! amount of unsafe code, and this crate is not an exception. In order to catch +//! potential bugs in this unsafe code, we make use of [`loom`], a +//! permutation-testing tool for concurrent Rust programs. All `unsafe` blocks +//! this crate occur in accesses to `loom` `UnsafeCell`s. This means that when +//! those accesses occur in this crate's tests, `loom` will assert that they are +//! valid under the C11 memory model across multiple permutations of concurrent +//! executions of those tests. +//! +//! In order to guard against the [ABA problem][aba], this crate makes use of +//! _generational indices_. Each slot in the slab tracks a generation counter +//! which is incremented every time a value is inserted into that slot, and the +//! indices returned by [`Slab::insert`] include the generation of the slot when +//! the value was inserted, packed into the high-order bits of the index. This +//! ensures that if a value is inserted, removed, and a new value is inserted +//! into the same slot in the slab, the key returned by the first call to +//! `insert` will not map to the new value. +//! +//! Since a fixed number of bits are set aside to use for storing the generation +//! counter, the counter will wrap around after being incremented a number of +//! times. To avoid situations where a returned index lives long enough to see the +//! generation counter wrap around to the same value, it is good to be fairly +//! generous when configuring the allocation of index bits. +//! +//! [`loom`]: https://crates.io/crates/loom +//! [aba]: https://en.wikipedia.org/wiki/ABA_problem +//! [`Slab::insert`]: struct.Slab.html#method.insert +//! +//! # Performance +//! +//! These graphs were produced by [benchmarks] of the sharded slab implementation, +//! using the [`criterion`] crate. +//! +//! The first shows the results of a benchmark where an increasing number of +//! items are inserted and then removed into a slab concurrently by five +//! threads. It compares the performance of the sharded slab implementation +//! with a `RwLock<slab::Slab>`: +//! +//! <img width="1124" alt="Screen Shot 2019-10-01 at 5 09 49 PM" src="https://user-images.githubusercontent.com/2796466/66078398-cd6c9f80-e516-11e9-9923-0ed6292e8498.png"> +//! +//! The second graph shows the results of a benchmark where an increasing +//! number of items are inserted and then removed by a _single_ thread. It +//! compares the performance of the sharded slab implementation with an +//! `RwLock<slab::Slab>` and a `mut slab::Slab`. +//! +//! <img width="925" alt="Screen Shot 2019-10-01 at 5 13 45 PM" src="https://user-images.githubusercontent.com/2796466/66078469-f0974f00-e516-11e9-95b5-f65f0aa7e494.png"> +//! +//! These benchmarks demonstrate that, while the sharded approach introduces +//! a small constant-factor overhead, it offers significantly better +//! performance across concurrent accesses. +//! +//! [benchmarks]: https://github.com/hawkw/sharded-slab/blob/master/benches/bench.rs +//! [`criterion`]: https://crates.io/crates/criterion +//! +//! # Implementation Notes +//! +//! See [this page](crate::implementation) for details on this crate's design +//! and implementation. +//! +#![doc(html_root_url = "https://docs.rs/sharded-slab/0.1.4")] +#![warn(missing_debug_implementations, missing_docs)] +#![cfg_attr(docsrs, warn(rustdoc::broken_intra_doc_links))] +#[macro_use] +mod macros; + +pub mod implementation; +pub mod pool; + +pub(crate) mod cfg; +pub(crate) mod sync; + +mod clear; +mod iter; +mod page; +mod shard; +mod tid; + +pub use cfg::{Config, DefaultConfig}; +pub use clear::Clear; +#[doc(inline)] +pub use pool::Pool; + +pub(crate) use tid::Tid; + +use cfg::CfgPrivate; +use shard::Shard; +use std::{fmt, marker::PhantomData, ptr, sync::Arc}; + +/// A sharded slab. +/// +/// See the [crate-level documentation](crate) for details on using this type. +pub struct Slab<T, C: cfg::Config = DefaultConfig> { + shards: shard::Array<Option<T>, C>, + _cfg: PhantomData<C>, +} + +/// A handle that allows access to an occupied entry in a [`Slab`]. +/// +/// While the guard exists, it indicates to the slab that the item the guard +/// references is currently being accessed. If the item is removed from the slab +/// while a guard exists, the removal will be deferred until all guards are +/// dropped. +pub struct Entry<'a, T, C: cfg::Config = DefaultConfig> { + inner: page::slot::Guard<Option<T>, C>, + value: ptr::NonNull<T>, + shard: &'a Shard<Option<T>, C>, + key: usize, +} + +/// A handle to a vacant entry in a [`Slab`]. +/// +/// `VacantEntry` allows constructing values with the key that they will be +/// assigned to. +/// +/// # Examples +/// +/// ``` +/// # use sharded_slab::Slab; +/// let mut slab = Slab::new(); +/// +/// let hello = { +/// let entry = slab.vacant_entry().unwrap(); +/// let key = entry.key(); +/// +/// entry.insert((key, "hello")); +/// key +/// }; +/// +/// assert_eq!(hello, slab.get(hello).unwrap().0); +/// assert_eq!("hello", slab.get(hello).unwrap().1); +/// ``` +#[derive(Debug)] +pub struct VacantEntry<'a, T, C: cfg::Config = DefaultConfig> { + inner: page::slot::InitGuard<Option<T>, C>, + key: usize, + _lt: PhantomData<&'a ()>, +} + +/// An owned reference to an occupied entry in a [`Slab`]. +/// +/// While the guard exists, it indicates to the slab that the item the guard +/// references is currently being accessed. If the item is removed from the slab +/// while the guard exists, the removal will be deferred until all guards are +/// dropped. +/// +/// Unlike [`Entry`], which borrows the slab, an `OwnedEntry` clones the [`Arc`] +/// around the slab. Therefore, it keeps the slab from being dropped until all +/// such guards have been dropped. This means that an `OwnedEntry` may be held for +/// an arbitrary lifetime. +/// +/// # Examples +/// +/// ``` +/// # use sharded_slab::Slab; +/// use std::sync::Arc; +/// +/// let slab: Arc<Slab<&'static str>> = Arc::new(Slab::new()); +/// let key = slab.insert("hello world").unwrap(); +/// +/// // Look up the created key, returning an `OwnedEntry`. +/// let value = slab.clone().get_owned(key).unwrap(); +/// +/// // Now, the original `Arc` clone of the slab may be dropped, but the +/// // returned `OwnedEntry` can still access the value. +/// assert_eq!(value, "hello world"); +/// ``` +/// +/// Unlike [`Entry`], an `OwnedEntry` may be stored in a struct which must live +/// for the `'static` lifetime: +/// +/// ``` +/// # use sharded_slab::Slab; +/// use sharded_slab::OwnedEntry; +/// use std::sync::Arc; +/// +/// pub struct MyStruct { +/// entry: OwnedEntry<&'static str>, +/// // ... other fields ... +/// } +/// +/// // Suppose this is some arbitrary function which requires a value that +/// // lives for the 'static lifetime... +/// fn function_requiring_static<T: 'static>(t: &T) { +/// // ... do something extremely important and interesting ... +/// } +/// +/// let slab: Arc<Slab<&'static str>> = Arc::new(Slab::new()); +/// let key = slab.insert("hello world").unwrap(); +/// +/// // Look up the created key, returning an `OwnedEntry`. +/// let entry = slab.clone().get_owned(key).unwrap(); +/// let my_struct = MyStruct { +/// entry, +/// // ... +/// }; +/// +/// // We can use `my_struct` anywhere where it is required to have the +/// // `'static` lifetime: +/// function_requiring_static(&my_struct); +/// ``` +/// +/// `OwnedEntry`s may be sent between threads: +/// +/// ``` +/// # use sharded_slab::Slab; +/// use std::{thread, sync::Arc}; +/// +/// let slab: Arc<Slab<&'static str>> = Arc::new(Slab::new()); +/// let key = slab.insert("hello world").unwrap(); +/// +/// // Look up the created key, returning an `OwnedEntry`. +/// let value = slab.clone().get_owned(key).unwrap(); +/// +/// thread::spawn(move || { +/// assert_eq!(value, "hello world"); +/// // ... +/// }).join().unwrap(); +/// ``` +/// +/// [`get`]: Slab::get +/// [`Arc`]: std::sync::Arc +pub struct OwnedEntry<T, C = DefaultConfig> +where + C: cfg::Config, +{ + inner: page::slot::Guard<Option<T>, C>, + value: ptr::NonNull<T>, + slab: Arc<Slab<T, C>>, + key: usize, +} + +impl<T> Slab<T> { + /// Returns a new slab with the default configuration parameters. + pub fn new() -> Self { + Self::new_with_config() + } + + /// Returns a new slab with the provided configuration parameters. + pub fn new_with_config<C: cfg::Config>() -> Slab<T, C> { + C::validate(); + Slab { + shards: shard::Array::new(), + _cfg: PhantomData, + } + } +} + +impl<T, C: cfg::Config> Slab<T, C> { + /// The number of bits in each index which are used by the slab. + /// + /// If other data is packed into the `usize` indices returned by + /// [`Slab::insert`], user code is free to use any bits higher than the + /// `USED_BITS`-th bit freely. + /// + /// This is determined by the [`Config`] type that configures the slab's + /// parameters. By default, all bits are used; this can be changed by + /// overriding the [`Config::RESERVED_BITS`][res] constant. + /// + /// [res]: crate::Config#RESERVED_BITS + pub const USED_BITS: usize = C::USED_BITS; + + /// Inserts a value into the slab, returning the integer index at which that + /// value was inserted. This index can then be used to access the entry. + /// + /// If this function returns `None`, then the shard for the current thread + /// is full and no items can be added until some are removed, or the maximum + /// number of shards has been reached. + /// + /// # Examples + /// ```rust + /// # use sharded_slab::Slab; + /// let slab = Slab::new(); + /// + /// let key = slab.insert("hello world").unwrap(); + /// assert_eq!(slab.get(key).unwrap(), "hello world"); + /// ``` + pub fn insert(&self, value: T) -> Option<usize> { + let (tid, shard) = self.shards.current(); + test_println!("insert {:?}", tid); + let mut value = Some(value); + shard + .init_with(|idx, slot| { + let gen = slot.insert(&mut value)?; + Some(gen.pack(idx)) + }) + .map(|idx| tid.pack(idx)) + } + + /// Return a handle to a vacant entry allowing for further manipulation. + /// + /// This function is useful when creating values that must contain their + /// slab index. The returned [`VacantEntry`] reserves a slot in the slab and + /// is able to return the index of the entry. + /// + /// # Examples + /// + /// ``` + /// # use sharded_slab::Slab; + /// let mut slab = Slab::new(); + /// + /// let hello = { + /// let entry = slab.vacant_entry().unwrap(); + /// let key = entry.key(); + /// + /// entry.insert((key, "hello")); + /// key + /// }; + /// + /// assert_eq!(hello, slab.get(hello).unwrap().0); + /// assert_eq!("hello", slab.get(hello).unwrap().1); + /// ``` + pub fn vacant_entry(&self) -> Option<VacantEntry<'_, T, C>> { + let (tid, shard) = self.shards.current(); + test_println!("vacant_entry {:?}", tid); + shard.init_with(|idx, slot| { + let inner = slot.init()?; + let key = inner.generation().pack(tid.pack(idx)); + Some(VacantEntry { + inner, + key, + _lt: PhantomData, + }) + }) + } + + /// Remove the value at the given index in the slab, returning `true` if a + /// value was removed. + /// + /// Unlike [`take`], this method does _not_ block the current thread until + /// the value can be removed. Instead, if another thread is currently + /// accessing that value, this marks it to be removed by that thread when it + /// finishes accessing the value. + /// + /// # Examples + /// + /// ```rust + /// let slab = sharded_slab::Slab::new(); + /// let key = slab.insert("hello world").unwrap(); + /// + /// // Remove the item from the slab. + /// assert!(slab.remove(key)); + /// + /// // Now, the slot is empty. + /// assert!(!slab.contains(key)); + /// ``` + /// + /// ```rust + /// use std::sync::Arc; + /// + /// let slab = Arc::new(sharded_slab::Slab::new()); + /// let key = slab.insert("hello world").unwrap(); + /// + /// let slab2 = slab.clone(); + /// let thread2 = std::thread::spawn(move || { + /// // Depending on when this thread begins executing, the item may + /// // or may not have already been removed... + /// if let Some(item) = slab2.get(key) { + /// assert_eq!(item, "hello world"); + /// } + /// }); + /// + /// // The item will be removed by thread2 when it finishes accessing it. + /// assert!(slab.remove(key)); + /// + /// thread2.join().unwrap(); + /// assert!(!slab.contains(key)); + /// ``` + /// [`take`]: Slab::take + pub fn remove(&self, idx: usize) -> bool { + // The `Drop` impl for `Entry` calls `remove_local` or `remove_remote` based + // on where the guard was dropped from. If the dropped guard was the last one, this will + // call `Slot::remove_value` which actually clears storage. + let tid = C::unpack_tid(idx); + + test_println!("rm_deferred {:?}", tid); + let shard = self.shards.get(tid.as_usize()); + if tid.is_current() { + shard.map(|shard| shard.remove_local(idx)).unwrap_or(false) + } else { + shard.map(|shard| shard.remove_remote(idx)).unwrap_or(false) + } + } + + /// Removes the value associated with the given key from the slab, returning + /// it. + /// + /// If the slab does not contain a value for that key, `None` is returned + /// instead. + /// + /// If the value associated with the given key is currently being + /// accessed by another thread, this method will block the current thread + /// until the item is no longer accessed. If this is not desired, use + /// [`remove`] instead. + /// + /// **Note**: This method blocks the calling thread by spinning until the + /// currently outstanding references are released. Spinning for long periods + /// of time can result in high CPU time and power consumption. Therefore, + /// `take` should only be called when other references to the slot are + /// expected to be dropped soon (e.g., when all accesses are relatively + /// short). + /// + /// # Examples + /// + /// ```rust + /// let slab = sharded_slab::Slab::new(); + /// let key = slab.insert("hello world").unwrap(); + /// + /// // Remove the item from the slab, returning it. + /// assert_eq!(slab.take(key), Some("hello world")); + /// + /// // Now, the slot is empty. + /// assert!(!slab.contains(key)); + /// ``` + /// + /// ```rust + /// use std::sync::Arc; + /// + /// let slab = Arc::new(sharded_slab::Slab::new()); + /// let key = slab.insert("hello world").unwrap(); + /// + /// let slab2 = slab.clone(); + /// let thread2 = std::thread::spawn(move || { + /// // Depending on when this thread begins executing, the item may + /// // or may not have already been removed... + /// if let Some(item) = slab2.get(key) { + /// assert_eq!(item, "hello world"); + /// } + /// }); + /// + /// // The item will only be removed when the other thread finishes + /// // accessing it. + /// assert_eq!(slab.take(key), Some("hello world")); + /// + /// thread2.join().unwrap(); + /// assert!(!slab.contains(key)); + /// ``` + /// [`remove`]: Slab::remove + pub fn take(&self, idx: usize) -> Option<T> { + let tid = C::unpack_tid(idx); + + test_println!("rm {:?}", tid); + let shard = self.shards.get(tid.as_usize())?; + if tid.is_current() { + shard.take_local(idx) + } else { + shard.take_remote(idx) + } + } + + /// Return a reference to the value associated with the given key. + /// + /// If the slab does not contain a value for the given key, or if the + /// maximum number of concurrent references to the slot has been reached, + /// `None` is returned instead. + /// + /// # Examples + /// + /// ```rust + /// let slab = sharded_slab::Slab::new(); + /// let key = slab.insert("hello world").unwrap(); + /// + /// assert_eq!(slab.get(key).unwrap(), "hello world"); + /// assert!(slab.get(12345).is_none()); + /// ``` + pub fn get(&self, key: usize) -> Option<Entry<'_, T, C>> { + let tid = C::unpack_tid(key); + + test_println!("get {:?}; current={:?}", tid, Tid::<C>::current()); + let shard = self.shards.get(tid.as_usize())?; + shard.with_slot(key, |slot| { + let inner = slot.get(C::unpack_gen(key))?; + let value = ptr::NonNull::from(slot.value().as_ref().unwrap()); + Some(Entry { + inner, + value, + shard, + key, + }) + }) + } + + /// Return an owned reference to the value at the given index. + /// + /// If the slab does not contain a value for the given key, `None` is + /// returned instead. + /// + /// Unlike [`get`], which borrows the slab, this method _clones_ the [`Arc`] + /// around the slab. This means that the returned [`OwnedEntry`] can be held + /// for an arbitrary lifetime. However, this method requires that the slab + /// itself be wrapped in an `Arc`. + /// + /// # Examples + /// + /// ``` + /// # use sharded_slab::Slab; + /// use std::sync::Arc; + /// + /// let slab: Arc<Slab<&'static str>> = Arc::new(Slab::new()); + /// let key = slab.insert("hello world").unwrap(); + /// + /// // Look up the created key, returning an `OwnedEntry`. + /// let value = slab.clone().get_owned(key).unwrap(); + /// + /// // Now, the original `Arc` clone of the slab may be dropped, but the + /// // returned `OwnedEntry` can still access the value. + /// assert_eq!(value, "hello world"); + /// ``` + /// + /// Unlike [`Entry`], an `OwnedEntry` may be stored in a struct which must live + /// for the `'static` lifetime: + /// + /// ``` + /// # use sharded_slab::Slab; + /// use sharded_slab::OwnedEntry; + /// use std::sync::Arc; + /// + /// pub struct MyStruct { + /// entry: OwnedEntry<&'static str>, + /// // ... other fields ... + /// } + /// + /// // Suppose this is some arbitrary function which requires a value that + /// // lives for the 'static lifetime... + /// fn function_requiring_static<T: 'static>(t: &T) { + /// // ... do something extremely important and interesting ... + /// } + /// + /// let slab: Arc<Slab<&'static str>> = Arc::new(Slab::new()); + /// let key = slab.insert("hello world").unwrap(); + /// + /// // Look up the created key, returning an `OwnedEntry`. + /// let entry = slab.clone().get_owned(key).unwrap(); + /// let my_struct = MyStruct { + /// entry, + /// // ... + /// }; + /// + /// // We can use `my_struct` anywhere where it is required to have the + /// // `'static` lifetime: + /// function_requiring_static(&my_struct); + /// ``` + /// + /// [`OwnedEntry`]s may be sent between threads: + /// + /// ``` + /// # use sharded_slab::Slab; + /// use std::{thread, sync::Arc}; + /// + /// let slab: Arc<Slab<&'static str>> = Arc::new(Slab::new()); + /// let key = slab.insert("hello world").unwrap(); + /// + /// // Look up the created key, returning an `OwnedEntry`. + /// let value = slab.clone().get_owned(key).unwrap(); + /// + /// thread::spawn(move || { + /// assert_eq!(value, "hello world"); + /// // ... + /// }).join().unwrap(); + /// ``` + /// + /// [`get`]: Slab::get + /// [`Arc`]: std::sync::Arc + pub fn get_owned(self: Arc<Self>, key: usize) -> Option<OwnedEntry<T, C>> { + let tid = C::unpack_tid(key); + + test_println!("get_owned {:?}; current={:?}", tid, Tid::<C>::current()); + let shard = self.shards.get(tid.as_usize())?; + shard.with_slot(key, |slot| { + let inner = slot.get(C::unpack_gen(key))?; + let value = ptr::NonNull::from(slot.value().as_ref().unwrap()); + Some(OwnedEntry { + inner, + value, + slab: self.clone(), + key, + }) + }) + } + + /// Returns `true` if the slab contains a value for the given key. + /// + /// # Examples + /// + /// ``` + /// let slab = sharded_slab::Slab::new(); + /// + /// let key = slab.insert("hello world").unwrap(); + /// assert!(slab.contains(key)); + /// + /// slab.take(key).unwrap(); + /// assert!(!slab.contains(key)); + /// ``` + pub fn contains(&self, key: usize) -> bool { + self.get(key).is_some() + } + + /// Returns an iterator over all the items in the slab. + pub fn unique_iter(&mut self) -> iter::UniqueIter<'_, T, C> { + let mut shards = self.shards.iter_mut(); + let shard = shards.next().expect("must be at least 1 shard"); + let mut pages = shard.iter(); + let slots = pages.next().and_then(page::Shared::iter); + iter::UniqueIter { + shards, + slots, + pages, + } + } +} + +impl<T> Default for Slab<T> { + fn default() -> Self { + Self::new() + } +} + +impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Slab<T, C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Slab") + .field("shards", &self.shards) + .field("config", &C::debug()) + .finish() + } +} + +unsafe impl<T: Send, C: cfg::Config> Send for Slab<T, C> {} +unsafe impl<T: Sync, C: cfg::Config> Sync for Slab<T, C> {} + +// === impl Entry === + +impl<'a, T, C: cfg::Config> Entry<'a, T, C> { + /// Returns the key used to access the guard. + pub fn key(&self) -> usize { + self.key + } + + #[inline(always)] + fn value(&self) -> &T { + unsafe { + // Safety: this is always going to be valid, as it's projected from + // the safe reference to `self.value` --- this is just to avoid + // having to `expect` an option in the hot path when dereferencing. + self.value.as_ref() + } + } +} + +impl<'a, T, C: cfg::Config> std::ops::Deref for Entry<'a, T, C> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value() + } +} + +impl<'a, T, C: cfg::Config> Drop for Entry<'a, T, C> { + fn drop(&mut self) { + let should_remove = unsafe { + // Safety: calling `slot::Guard::release` is unsafe, since the + // `Guard` value contains a pointer to the slot that may outlive the + // slab containing that slot. Here, the `Entry` guard owns a + // borrowed reference to the shard containing that slot, which + // ensures that the slot will not be dropped while this `Guard` + // exists. + self.inner.release() + }; + if should_remove { + self.shard.clear_after_release(self.key) + } + } +} + +impl<'a, T, C> fmt::Debug for Entry<'a, T, C> +where + T: fmt::Debug, + C: cfg::Config, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self.value(), f) + } +} + +impl<'a, T, C> PartialEq<T> for Entry<'a, T, C> +where + T: PartialEq<T>, + C: cfg::Config, +{ + fn eq(&self, other: &T) -> bool { + self.value().eq(other) + } +} + +// === impl VacantEntry === + +impl<'a, T, C: cfg::Config> VacantEntry<'a, T, C> { + /// Insert a value in the entry. + /// + /// To get the integer index at which this value will be inserted, use + /// [`key`] prior to calling `insert`. + /// + /// # Examples + /// + /// ``` + /// # use sharded_slab::Slab; + /// let mut slab = Slab::new(); + /// + /// let hello = { + /// let entry = slab.vacant_entry().unwrap(); + /// let key = entry.key(); + /// + /// entry.insert((key, "hello")); + /// key + /// }; + /// + /// assert_eq!(hello, slab.get(hello).unwrap().0); + /// assert_eq!("hello", slab.get(hello).unwrap().1); + /// ``` + /// + /// [`key`]: VacantEntry::key + pub fn insert(mut self, val: T) { + let value = unsafe { + // Safety: this `VacantEntry` only lives as long as the `Slab` it was + // borrowed from, so it cannot outlive the entry's slot. + self.inner.value_mut() + }; + debug_assert!( + value.is_none(), + "tried to insert to a slot that already had a value!" + ); + *value = Some(val); + let _released = unsafe { + // Safety: again, this `VacantEntry` only lives as long as the + // `Slab` it was borrowed from, so it cannot outlive the entry's + // slot. + self.inner.release() + }; + debug_assert!( + !_released, + "removing a value before it was inserted should be a no-op" + ) + } + + /// Return the integer index at which this entry will be inserted. + /// + /// A value stored in this entry will be associated with this key. + /// + /// # Examples + /// + /// ``` + /// # use sharded_slab::*; + /// let mut slab = Slab::new(); + /// + /// let hello = { + /// let entry = slab.vacant_entry().unwrap(); + /// let key = entry.key(); + /// + /// entry.insert((key, "hello")); + /// key + /// }; + /// + /// assert_eq!(hello, slab.get(hello).unwrap().0); + /// assert_eq!("hello", slab.get(hello).unwrap().1); + /// ``` + pub fn key(&self) -> usize { + self.key + } +} +// === impl OwnedEntry === + +impl<T, C> OwnedEntry<T, C> +where + C: cfg::Config, +{ + /// Returns the key used to access this guard + pub fn key(&self) -> usize { + self.key + } + + #[inline(always)] + fn value(&self) -> &T { + unsafe { + // Safety: this is always going to be valid, as it's projected from + // the safe reference to `self.value` --- this is just to avoid + // having to `expect` an option in the hot path when dereferencing. + self.value.as_ref() + } + } +} + +impl<T, C> std::ops::Deref for OwnedEntry<T, C> +where + C: cfg::Config, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value() + } +} + +impl<T, C> Drop for OwnedEntry<T, C> +where + C: cfg::Config, +{ + fn drop(&mut self) { + test_println!("drop OwnedEntry: try clearing data"); + let should_clear = unsafe { + // Safety: calling `slot::Guard::release` is unsafe, since the + // `Guard` value contains a pointer to the slot that may outlive the + // slab containing that slot. Here, the `OwnedEntry` owns an `Arc` + // clone of the pool, which keeps it alive as long as the `OwnedEntry` + // exists. + self.inner.release() + }; + if should_clear { + let shard_idx = Tid::<C>::from_packed(self.key); + test_println!("-> shard={:?}", shard_idx); + if let Some(shard) = self.slab.shards.get(shard_idx.as_usize()) { + shard.clear_after_release(self.key) + } else { + test_println!("-> shard={:?} does not exist! THIS IS A BUG", shard_idx); + debug_assert!(std::thread::panicking(), "[internal error] tried to drop an `OwnedEntry` to a slot on a shard that never existed!"); + } + } + } +} + +impl<T, C> fmt::Debug for OwnedEntry<T, C> +where + T: fmt::Debug, + C: cfg::Config, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self.value(), f) + } +} + +impl<T, C> PartialEq<T> for OwnedEntry<T, C> +where + T: PartialEq<T>, + C: cfg::Config, +{ + fn eq(&self, other: &T) -> bool { + *self.value() == *other + } +} + +unsafe impl<T, C> Sync for OwnedEntry<T, C> +where + T: Sync, + C: cfg::Config, +{ +} + +unsafe impl<T, C> Send for OwnedEntry<T, C> +where + T: Sync, + C: cfg::Config, +{ +} + +// === pack === + +pub(crate) trait Pack<C: cfg::Config>: Sized { + // ====== provided by each implementation ================================= + + /// The number of bits occupied by this type when packed into a usize. + /// + /// This must be provided to determine the number of bits into which to pack + /// the type. + const LEN: usize; + /// The type packed on the less significant side of this type. + /// + /// If this type is packed into the least significant bit of a usize, this + /// should be `()`, which occupies no bytes. + /// + /// This is used to calculate the shift amount for packing this value. + type Prev: Pack<C>; + + // ====== calculated automatically ======================================== + + /// A number consisting of `Self::LEN` 1 bits, starting at the least + /// significant bit. + /// + /// This is the higest value this type can represent. This number is shifted + /// left by `Self::SHIFT` bits to calculate this type's `MASK`. + /// + /// This is computed automatically based on `Self::LEN`. + const BITS: usize = { + let shift = 1 << (Self::LEN - 1); + shift | (shift - 1) + }; + /// The number of bits to shift a number to pack it into a usize with other + /// values. + /// + /// This is caculated automatically based on the `LEN` and `SHIFT` constants + /// of the previous value. + const SHIFT: usize = Self::Prev::SHIFT + Self::Prev::LEN; + + /// The mask to extract only this type from a packed `usize`. + /// + /// This is calculated by shifting `Self::BITS` left by `Self::SHIFT`. + const MASK: usize = Self::BITS << Self::SHIFT; + + fn as_usize(&self) -> usize; + fn from_usize(val: usize) -> Self; + + #[inline(always)] + fn pack(&self, to: usize) -> usize { + let value = self.as_usize(); + debug_assert!(value <= Self::BITS); + + (to & !Self::MASK) | (value << Self::SHIFT) + } + + #[inline(always)] + fn from_packed(from: usize) -> Self { + let value = (from & Self::MASK) >> Self::SHIFT; + debug_assert!(value <= Self::BITS); + Self::from_usize(value) + } +} + +impl<C: cfg::Config> Pack<C> for () { + const BITS: usize = 0; + const LEN: usize = 0; + const SHIFT: usize = 0; + const MASK: usize = 0; + + type Prev = (); + + fn as_usize(&self) -> usize { + unreachable!() + } + fn from_usize(_val: usize) -> Self { + unreachable!() + } + + fn pack(&self, _to: usize) -> usize { + unreachable!() + } + + fn from_packed(_from: usize) -> Self { + unreachable!() + } +} + +#[cfg(test)] +pub(crate) use self::tests::util as test_util; + +#[cfg(test)] +mod tests; diff --git a/vendor/sharded-slab/src/macros.rs b/vendor/sharded-slab/src/macros.rs new file mode 100644 index 000000000..e431f64f1 --- /dev/null +++ b/vendor/sharded-slab/src/macros.rs @@ -0,0 +1,67 @@ +macro_rules! test_println { + ($($arg:tt)*) => { + if cfg!(test) && cfg!(slab_print) { + if std::thread::panicking() { + // getting the thread ID while panicking doesn't seem to play super nicely with loom's + // mock lazy_static... + println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*)) + } else { + println!("[{:?} {:>17}:{:<3}] {}", crate::Tid::<crate::DefaultConfig>::current(), file!(), line!(), format_args!($($arg)*)) + } + } + } +} + +#[cfg(all(test, loom))] +macro_rules! test_dbg { + ($e:expr) => { + match $e { + e => { + test_println!("{} = {:?}", stringify!($e), &e); + e + } + } + }; +} + +macro_rules! panic_in_drop { + ($($arg:tt)*) => { + if !std::thread::panicking() { + panic!($($arg)*) + } else { + let thread = std::thread::current(); + eprintln!( + "thread '{thread}' attempted to panic at '{msg}', {file}:{line}:{col}\n\ + note: we were already unwinding due to a previous panic.", + thread = thread.name().unwrap_or("<unnamed>"), + msg = format_args!($($arg)*), + file = file!(), + line = line!(), + col = column!(), + ); + } + } +} + +macro_rules! debug_assert_eq_in_drop { + ($this:expr, $that:expr) => { + debug_assert_eq_in_drop!(@inner $this, $that, "") + }; + ($this:expr, $that:expr, $($arg:tt)+) => { + debug_assert_eq_in_drop!(@inner $this, $that, format_args!(": {}", format_args!($($arg)+))) + }; + (@inner $this:expr, $that:expr, $msg:expr) => { + if cfg!(debug_assertions) { + if $this != $that { + panic_in_drop!( + "assertion failed ({} == {})\n left: `{:?}`,\n right: `{:?}`{}", + stringify!($this), + stringify!($that), + $this, + $that, + $msg, + ) + } + } + } +} diff --git a/vendor/sharded-slab/src/page/mod.rs b/vendor/sharded-slab/src/page/mod.rs new file mode 100644 index 000000000..0499fb535 --- /dev/null +++ b/vendor/sharded-slab/src/page/mod.rs @@ -0,0 +1,449 @@ +use crate::cfg::{self, CfgPrivate}; +use crate::clear::Clear; +use crate::sync::UnsafeCell; +use crate::Pack; + +pub(crate) mod slot; +mod stack; +pub(crate) use self::slot::Slot; +use std::{fmt, marker::PhantomData}; + +/// A page address encodes the location of a slot within a shard (the page +/// number and offset within that page) as a single linear value. +#[repr(transparent)] +pub(crate) struct Addr<C: cfg::Config = cfg::DefaultConfig> { + addr: usize, + _cfg: PhantomData<fn(C)>, +} + +impl<C: cfg::Config> Addr<C> { + const NULL: usize = Self::BITS + 1; + + pub(crate) fn index(self) -> usize { + // Since every page is twice as large as the previous page, and all page sizes + // are powers of two, we can determine the page index that contains a given + // address by counting leading zeros, which tells us what power of two + // the offset fits into. + // + // First, we must shift down to the smallest page size, so that the last + // offset on the first page becomes 0. + let shifted = (self.addr + C::INITIAL_SZ) >> C::ADDR_INDEX_SHIFT; + // Now, we can determine the number of twos places by counting the + // number of leading zeros (unused twos places) in the number's binary + // representation, and subtracting that count from the total number of bits in a word. + cfg::WIDTH - shifted.leading_zeros() as usize + } + + pub(crate) fn offset(self) -> usize { + self.addr + } +} + +pub(crate) trait FreeList<C> { + fn push<T>(&self, new_head: usize, slot: &Slot<T, C>) + where + C: cfg::Config; +} + +impl<C: cfg::Config> Pack<C> for Addr<C> { + const LEN: usize = C::MAX_PAGES + C::ADDR_INDEX_SHIFT; + + type Prev = (); + + fn as_usize(&self) -> usize { + self.addr + } + + fn from_usize(addr: usize) -> Self { + debug_assert!(addr <= Self::BITS); + Self { + addr, + _cfg: PhantomData, + } + } +} + +pub(crate) type Iter<'a, T, C> = std::iter::FilterMap< + std::slice::Iter<'a, Slot<Option<T>, C>>, + fn(&'a Slot<Option<T>, C>) -> Option<&'a T>, +>; + +pub(crate) struct Local { + /// Index of the first slot on the local free list + head: UnsafeCell<usize>, +} + +pub(crate) struct Shared<T, C> { + /// The remote free list + /// + /// Slots freed from a remote thread are pushed onto this list. + remote: stack::TransferStack<C>, + // Total size of the page. + // + // If the head index of the local or remote free list is greater than the size of the + // page, then that free list is emtpy. If the head of both free lists is greater than `size` + // then there are no slots left in that page. + size: usize, + prev_sz: usize, + slab: UnsafeCell<Option<Slots<T, C>>>, +} + +type Slots<T, C> = Box<[Slot<T, C>]>; + +impl Local { + pub(crate) fn new() -> Self { + Self { + head: UnsafeCell::new(0), + } + } + + #[inline(always)] + fn head(&self) -> usize { + self.head.with(|head| unsafe { *head }) + } + + #[inline(always)] + fn set_head(&self, new_head: usize) { + self.head.with_mut(|head| unsafe { + *head = new_head; + }) + } +} + +impl<C: cfg::Config> FreeList<C> for Local { + fn push<T>(&self, new_head: usize, slot: &Slot<T, C>) { + slot.set_next(self.head()); + self.set_head(new_head); + } +} + +impl<T, C> Shared<T, C> +where + C: cfg::Config, +{ + const NULL: usize = Addr::<C>::NULL; + + pub(crate) fn new(size: usize, prev_sz: usize) -> Self { + Self { + prev_sz, + size, + remote: stack::TransferStack::new(), + slab: UnsafeCell::new(None), + } + } + + /// Return the head of the freelist + /// + /// If there is space on the local list, it returns the head of the local list. Otherwise, it + /// pops all the slots from the global list and returns the head of that list + /// + /// *Note*: The local list's head is reset when setting the new state in the slot pointed to be + /// `head` returned from this function + #[inline] + fn pop(&self, local: &Local) -> Option<usize> { + let head = local.head(); + + test_println!("-> local head {:?}", head); + + // are there any items on the local free list? (fast path) + let head = if head < self.size { + head + } else { + // slow path: if the local free list is empty, pop all the items on + // the remote free list. + let head = self.remote.pop_all(); + + test_println!("-> remote head {:?}", head); + head? + }; + + // if the head is still null, both the local and remote free lists are + // empty --- we can't fit any more items on this page. + if head == Self::NULL { + test_println!("-> NULL! {:?}", head); + None + } else { + Some(head) + } + } + + /// Returns `true` if storage is currently allocated for this page, `false` + /// otherwise. + #[inline] + fn is_unallocated(&self) -> bool { + self.slab.with(|s| unsafe { (*s).is_none() }) + } + + #[inline] + pub(crate) fn with_slot<'a, U>( + &'a self, + addr: Addr<C>, + f: impl FnOnce(&'a Slot<T, C>) -> Option<U>, + ) -> Option<U> { + let poff = addr.offset() - self.prev_sz; + + test_println!("-> offset {:?}", poff); + + self.slab.with(|slab| { + let slot = unsafe { &*slab }.as_ref()?.get(poff)?; + f(slot) + }) + } + + #[inline(always)] + pub(crate) fn free_list(&self) -> &impl FreeList<C> { + &self.remote + } +} + +impl<'a, T, C> Shared<Option<T>, C> +where + C: cfg::Config + 'a, +{ + pub(crate) fn take<F>( + &self, + addr: Addr<C>, + gen: slot::Generation<C>, + free_list: &F, + ) -> Option<T> + where + F: FreeList<C>, + { + let offset = addr.offset() - self.prev_sz; + + test_println!("-> take: offset {:?}", offset); + + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref()?; + let slot = slab.get(offset)?; + slot.remove_value(gen, offset, free_list) + }) + } + + pub(crate) fn remove<F: FreeList<C>>( + &self, + addr: Addr<C>, + gen: slot::Generation<C>, + free_list: &F, + ) -> bool { + let offset = addr.offset() - self.prev_sz; + + test_println!("-> offset {:?}", offset); + + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot.try_remove_value(gen, offset, free_list) + } else { + false + } + }) + } + + // Need this function separately, as we need to pass a function pointer to `filter_map` and + // `Slot::value` just returns a `&T`, specifically a `&Option<T>` for this impl. + fn make_ref(slot: &'a Slot<Option<T>, C>) -> Option<&'a T> { + slot.value().as_ref() + } + + pub(crate) fn iter(&self) -> Option<Iter<'a, T, C>> { + let slab = self.slab.with(|slab| unsafe { (&*slab).as_ref() }); + slab.map(|slab| { + slab.iter() + .filter_map(Shared::make_ref as fn(&'a Slot<Option<T>, C>) -> Option<&'a T>) + }) + } +} + +impl<T, C> Shared<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + pub(crate) fn init_with<U>( + &self, + local: &Local, + init: impl FnOnce(usize, &Slot<T, C>) -> Option<U>, + ) -> Option<U> { + let head = self.pop(local)?; + + // do we need to allocate storage for this page? + if self.is_unallocated() { + self.allocate(); + } + + let index = head + self.prev_sz; + + let result = self.slab.with(|slab| { + let slab = unsafe { &*(slab) } + .as_ref() + .expect("page must have been allocated to insert!"); + let slot = &slab[head]; + let result = init(index, slot)?; + local.set_head(slot.next()); + Some(result) + })?; + + test_println!("-> init_with: insert at offset: {}", index); + Some(result) + } + + /// Allocates storage for the page's slots. + #[cold] + fn allocate(&self) { + test_println!("-> alloc new page ({})", self.size); + debug_assert!(self.is_unallocated()); + + let mut slab = Vec::with_capacity(self.size); + slab.extend((1..self.size).map(Slot::new)); + slab.push(Slot::new(Self::NULL)); + self.slab.with_mut(|s| { + // safety: this mut access is safe — it only occurs to initially allocate the page, + // which only happens on this thread; if the page has not yet been allocated, other + // threads will not try to access it yet. + unsafe { + *s = Some(slab.into_boxed_slice()); + } + }); + } + + pub(crate) fn mark_clear<F: FreeList<C>>( + &self, + addr: Addr<C>, + gen: slot::Generation<C>, + free_list: &F, + ) -> bool { + let offset = addr.offset() - self.prev_sz; + + test_println!("-> offset {:?}", offset); + + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot.try_clear_storage(gen, offset, free_list) + } else { + false + } + }) + } + + pub(crate) fn clear<F: FreeList<C>>( + &self, + addr: Addr<C>, + gen: slot::Generation<C>, + free_list: &F, + ) -> bool { + let offset = addr.offset() - self.prev_sz; + + test_println!("-> offset {:?}", offset); + + self.slab.with(|slab| { + let slab = unsafe { &*slab }.as_ref(); + if let Some(slot) = slab.and_then(|slab| slab.get(offset)) { + slot.clear_storage(gen, offset, free_list) + } else { + false + } + }) + } +} + +impl fmt::Debug for Local { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.head.with(|head| { + let head = unsafe { *head }; + f.debug_struct("Local") + .field("head", &format_args!("{:#0x}", head)) + .finish() + }) + } +} + +impl<C, T> fmt::Debug for Shared<C, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Shared") + .field("remote", &self.remote) + .field("prev_sz", &self.prev_sz) + .field("size", &self.size) + // .field("slab", &self.slab) + .finish() + } +} + +impl<C: cfg::Config> fmt::Debug for Addr<C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Addr") + .field("addr", &format_args!("{:#0x}", &self.addr)) + .field("index", &self.index()) + .field("offset", &self.offset()) + .finish() + } +} + +impl<C: cfg::Config> PartialEq for Addr<C> { + fn eq(&self, other: &Self) -> bool { + self.addr == other.addr + } +} + +impl<C: cfg::Config> Eq for Addr<C> {} + +impl<C: cfg::Config> PartialOrd for Addr<C> { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + self.addr.partial_cmp(&other.addr) + } +} + +impl<C: cfg::Config> Ord for Addr<C> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.addr.cmp(&other.addr) + } +} + +impl<C: cfg::Config> Clone for Addr<C> { + fn clone(&self) -> Self { + Self::from_usize(self.addr) + } +} + +impl<C: cfg::Config> Copy for Addr<C> {} + +#[inline(always)] +pub(crate) fn indices<C: cfg::Config>(idx: usize) -> (Addr<C>, usize) { + let addr = C::unpack_addr(idx); + (addr, addr.index()) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::Pack; + use proptest::prelude::*; + + proptest! { + #[test] + fn addr_roundtrips(pidx in 0usize..Addr::<cfg::DefaultConfig>::BITS) { + let addr = Addr::<cfg::DefaultConfig>::from_usize(pidx); + let packed = addr.pack(0); + assert_eq!(addr, Addr::from_packed(packed)); + } + #[test] + fn gen_roundtrips(gen in 0usize..slot::Generation::<cfg::DefaultConfig>::BITS) { + let gen = slot::Generation::<cfg::DefaultConfig>::from_usize(gen); + let packed = gen.pack(0); + assert_eq!(gen, slot::Generation::from_packed(packed)); + } + + #[test] + fn page_roundtrips( + gen in 0usize..slot::Generation::<cfg::DefaultConfig>::BITS, + addr in 0usize..Addr::<cfg::DefaultConfig>::BITS, + ) { + let gen = slot::Generation::<cfg::DefaultConfig>::from_usize(gen); + let addr = Addr::<cfg::DefaultConfig>::from_usize(addr); + let packed = gen.pack(addr.pack(0)); + assert_eq!(addr, Addr::from_packed(packed)); + assert_eq!(gen, slot::Generation::from_packed(packed)); + } + } +} diff --git a/vendor/sharded-slab/src/page/slot.rs b/vendor/sharded-slab/src/page/slot.rs new file mode 100644 index 000000000..3387d5388 --- /dev/null +++ b/vendor/sharded-slab/src/page/slot.rs @@ -0,0 +1,920 @@ +use super::FreeList; +use crate::sync::{ + atomic::{AtomicUsize, Ordering}, + hint, UnsafeCell, +}; +use crate::{cfg, clear::Clear, Pack, Tid}; +use std::{fmt, marker::PhantomData, mem, ptr, thread}; + +pub(crate) struct Slot<T, C> { + lifecycle: AtomicUsize, + /// The offset of the next item on the free list. + next: UnsafeCell<usize>, + /// The data stored in the slot. + item: UnsafeCell<T>, + _cfg: PhantomData<fn(C)>, +} + +#[derive(Debug)] +pub(crate) struct Guard<T, C: cfg::Config = cfg::DefaultConfig> { + slot: ptr::NonNull<Slot<T, C>>, +} + +#[derive(Debug)] +pub(crate) struct InitGuard<T, C: cfg::Config = cfg::DefaultConfig> { + slot: ptr::NonNull<Slot<T, C>>, + curr_lifecycle: usize, + released: bool, +} + +#[repr(transparent)] +pub(crate) struct Generation<C = cfg::DefaultConfig> { + value: usize, + _cfg: PhantomData<fn(C)>, +} + +#[repr(transparent)] +pub(crate) struct RefCount<C = cfg::DefaultConfig> { + value: usize, + _cfg: PhantomData<fn(C)>, +} + +pub(crate) struct Lifecycle<C> { + state: State, + _cfg: PhantomData<fn(C)>, +} +struct LifecycleGen<C>(Generation<C>); + +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +#[repr(usize)] +enum State { + Present = 0b00, + Marked = 0b01, + Removing = 0b11, +} + +impl<C: cfg::Config> Pack<C> for Generation<C> { + /// Use all the remaining bits in the word for the generation counter, minus + /// any bits reserved by the user. + const LEN: usize = (cfg::WIDTH - C::RESERVED_BITS) - Self::SHIFT; + + type Prev = Tid<C>; + + #[inline(always)] + fn from_usize(u: usize) -> Self { + debug_assert!(u <= Self::BITS); + Self::new(u) + } + + #[inline(always)] + fn as_usize(&self) -> usize { + self.value + } +} + +impl<C: cfg::Config> Generation<C> { + fn new(value: usize) -> Self { + Self { + value, + _cfg: PhantomData, + } + } +} + +// Slot methods which should work across all trait bounds +impl<T, C> Slot<T, C> +where + C: cfg::Config, +{ + #[inline(always)] + pub(super) fn next(&self) -> usize { + self.next.with(|next| unsafe { *next }) + } + + #[inline(always)] + pub(crate) fn value(&self) -> &T { + self.item.with(|item| unsafe { &*item }) + } + + #[inline(always)] + pub(super) fn set_next(&self, next: usize) { + self.next.with_mut(|n| unsafe { + (*n) = next; + }) + } + + #[inline(always)] + pub(crate) fn get(&self, gen: Generation<C>) -> Option<Guard<T, C>> { + let mut lifecycle = self.lifecycle.load(Ordering::Acquire); + loop { + // Unpack the current state. + let state = Lifecycle::<C>::from_packed(lifecycle); + let current_gen = LifecycleGen::<C>::from_packed(lifecycle).0; + let refs = RefCount::<C>::from_packed(lifecycle); + + test_println!( + "-> get {:?}; current_gen={:?}; lifecycle={:#x}; state={:?}; refs={:?};", + gen, + current_gen, + lifecycle, + state, + refs, + ); + + // Is it okay to access this slot? The accessed generation must be + // current, and the slot must not be in the process of being + // removed. If we can no longer access the slot at the given + // generation, return `None`. + if gen != current_gen || state != Lifecycle::PRESENT { + test_println!("-> get: no longer exists!"); + return None; + } + + // Try to increment the slot's ref count by one. + let new_refs = refs.incr()?; + match self.lifecycle.compare_exchange( + lifecycle, + new_refs.pack(current_gen.pack(state.pack(0))), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + test_println!("-> {:?}", new_refs); + return Some(Guard { + slot: ptr::NonNull::from(self), + }); + } + Err(actual) => { + // Another thread modified the slot's state before us! We + // need to retry with the new state. + // + // Since the new state may mean that the accessed generation + // is no longer valid, we'll check again on the next + // iteration of the loop. + test_println!("-> get: retrying; lifecycle={:#x};", actual); + lifecycle = actual; + } + }; + } + } + + /// Marks this slot to be released, returning `true` if the slot can be + /// mutated *now* and `false` otherwise. + /// + /// This method checks if there are any references to this slot. If there _are_ valid + /// references, it just marks them for modification and returns and the next thread calling + /// either `clear_storage` or `remove_value` will try and modify the storage + fn mark_release(&self, gen: Generation<C>) -> Option<bool> { + let mut lifecycle = self.lifecycle.load(Ordering::Acquire); + let mut curr_gen; + + // Try to advance the slot's state to "MARKED", which indicates that it + // should be removed when it is no longer concurrently accessed. + loop { + curr_gen = LifecycleGen::from_packed(lifecycle).0; + test_println!( + "-> mark_release; gen={:?}; current_gen={:?};", + gen, + curr_gen + ); + + // Is the slot still at the generation we are trying to remove? + if gen != curr_gen { + return None; + } + + let state = Lifecycle::<C>::from_packed(lifecycle).state; + test_println!("-> mark_release; state={:?};", state); + match state { + State::Removing => { + test_println!("--> mark_release; cannot release (already removed!)"); + return None; + } + State::Marked => { + test_println!("--> mark_release; already marked;"); + break; + } + State::Present => {} + }; + + // Set the new state to `MARKED`. + let new_lifecycle = Lifecycle::<C>::MARKED.pack(lifecycle); + test_println!( + "-> mark_release; old_lifecycle={:#x}; new_lifecycle={:#x};", + lifecycle, + new_lifecycle + ); + + match self.lifecycle.compare_exchange( + lifecycle, + new_lifecycle, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => { + test_println!("-> mark_release; retrying"); + lifecycle = actual; + } + } + } + + // Unpack the current reference count to see if we can remove the slot now. + let refs = RefCount::<C>::from_packed(lifecycle); + test_println!("-> mark_release: marked; refs={:?};", refs); + + // Are there currently outstanding references to the slot? If so, it + // will have to be removed when those references are dropped. + Some(refs.value == 0) + } + + /// Mutates this slot. + /// + /// This method spins until no references to this slot are left, and calls the mutator + fn release_with<F, M, R>(&self, gen: Generation<C>, offset: usize, free: &F, mutator: M) -> R + where + F: FreeList<C>, + M: FnOnce(Option<&mut T>) -> R, + { + let mut lifecycle = self.lifecycle.load(Ordering::Acquire); + let mut advanced = false; + // Exponential spin backoff while waiting for the slot to be released. + let mut spin_exp = 0; + let next_gen = gen.advance(); + loop { + let current_gen = Generation::from_packed(lifecycle); + test_println!("-> release_with; lifecycle={:#x}; expected_gen={:?}; current_gen={:?}; next_gen={:?};", + lifecycle, + gen, + current_gen, + next_gen + ); + + // First, make sure we are actually able to remove the value. + // If we're going to remove the value, the generation has to match + // the value that `remove_value` was called with...unless we've + // already stored the new generation. + if (!advanced) && gen != current_gen { + test_println!("-> already removed!"); + return mutator(None); + } + + match self.lifecycle.compare_exchange( + lifecycle, + next_gen.pack(lifecycle), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(actual) => { + // If we're in this state, we have successfully advanced to + // the next generation. + advanced = true; + + // Make sure that there are no outstanding references. + let refs = RefCount::<C>::from_packed(actual); + test_println!("-> advanced gen; lifecycle={:#x}; refs={:?};", actual, refs); + if refs.value == 0 { + test_println!("-> ok to remove!"); + // safety: we've modified the generation of this slot and any other thread + // calling this method will exit out at the generation check above in the + // next iteraton of the loop. + let value = self + .item + .with_mut(|item| mutator(Some(unsafe { &mut *item }))); + free.push(offset, self); + return value; + } + + // Otherwise, a reference must be dropped before we can + // remove the value. Spin here until there are no refs remaining... + test_println!("-> refs={:?}; spin...", refs); + + // Back off, spinning and possibly yielding. + exponential_backoff(&mut spin_exp); + } + Err(actual) => { + test_println!("-> retrying; lifecycle={:#x};", actual); + lifecycle = actual; + // The state changed; reset the spin backoff. + spin_exp = 0; + } + } + } + } + + /// Initialize a slot + /// + /// This method initializes and sets up the state for a slot. When being used in `Pool`, we + /// only need to ensure that the `Slot` is in the right `state, while when being used in a + /// `Slab` we want to insert a value into it, as the memory is not initialized + pub(crate) fn init(&self) -> Option<InitGuard<T, C>> { + // Load the current lifecycle state. + let lifecycle = self.lifecycle.load(Ordering::Acquire); + let gen = LifecycleGen::<C>::from_packed(lifecycle).0; + let refs = RefCount::<C>::from_packed(lifecycle); + + test_println!( + "-> initialize_state; state={:?}; gen={:?}; refs={:?};", + Lifecycle::<C>::from_packed(lifecycle), + gen, + refs, + ); + + if refs.value != 0 { + test_println!("-> initialize while referenced! cancelling"); + return None; + } + + Some(InitGuard { + slot: ptr::NonNull::from(self), + curr_lifecycle: lifecycle, + released: false, + }) + } +} + +// Slot impl which _needs_ an `Option` for self.item, this is for `Slab` to use. +impl<T, C> Slot<Option<T>, C> +where + C: cfg::Config, +{ + fn is_empty(&self) -> bool { + self.item.with(|item| unsafe { (*item).is_none() }) + } + + /// Insert a value into a slot + /// + /// We first initialize the state and then insert the pased in value into the slot. + #[inline] + pub(crate) fn insert(&self, value: &mut Option<T>) -> Option<Generation<C>> { + debug_assert!(self.is_empty(), "inserted into full slot"); + debug_assert!(value.is_some(), "inserted twice"); + + let mut guard = self.init()?; + let gen = guard.generation(); + unsafe { + // Safety: Accessing the value of an `InitGuard` is unsafe because + // it has a pointer to a slot which may dangle. Here, we know the + // pointed slot is alive because we have a reference to it in scope, + // and the `InitGuard` will be dropped when this function returns. + mem::swap(guard.value_mut(), value); + guard.release(); + }; + test_println!("-> inserted at {:?}", gen); + + Some(gen) + } + + /// Tries to remove the value in the slot, returning `true` if the value was + /// removed. + /// + /// This method tries to remove the value in the slot. If there are existing references, then + /// the slot is marked for removal and the next thread calling either this method or + /// `remove_value` will do the work instead. + #[inline] + pub(super) fn try_remove_value<F: FreeList<C>>( + &self, + gen: Generation<C>, + offset: usize, + free: &F, + ) -> bool { + let should_remove = match self.mark_release(gen) { + // If `mark_release` returns `Some`, a value exists at this + // generation. The bool inside this option indicates whether or not + // _we're_ allowed to remove the value. + Some(should_remove) => should_remove, + // Otherwise, the generation we tried to remove has already expired, + // and we did not mark anything for removal. + None => { + test_println!( + "-> try_remove_value; nothing exists at generation={:?}", + gen + ); + return false; + } + }; + + test_println!("-> try_remove_value; marked!"); + + if should_remove { + // We're allowed to remove the slot now! + test_println!("-> try_remove_value; can remove now"); + self.remove_value(gen, offset, free); + } + + true + } + + #[inline] + pub(super) fn remove_value<F: FreeList<C>>( + &self, + gen: Generation<C>, + offset: usize, + free: &F, + ) -> Option<T> { + self.release_with(gen, offset, free, |item| item.and_then(Option::take)) + } +} + +// These impls are specific to `Pool` +impl<T, C> Slot<T, C> +where + T: Default + Clear, + C: cfg::Config, +{ + pub(in crate::page) fn new(next: usize) -> Self { + Self { + lifecycle: AtomicUsize::new(Lifecycle::<C>::REMOVING.as_usize()), + item: UnsafeCell::new(T::default()), + next: UnsafeCell::new(next), + _cfg: PhantomData, + } + } + + /// Try to clear this slot's storage + /// + /// If there are references to this slot, then we mark this slot for clearing and let the last + /// thread do the work for us. + #[inline] + pub(super) fn try_clear_storage<F: FreeList<C>>( + &self, + gen: Generation<C>, + offset: usize, + free: &F, + ) -> bool { + let should_clear = match self.mark_release(gen) { + // If `mark_release` returns `Some`, a value exists at this + // generation. The bool inside this option indicates whether or not + // _we're_ allowed to clear the value. + Some(should_clear) => should_clear, + // Otherwise, the generation we tried to remove has already expired, + // and we did not mark anything for removal. + None => { + test_println!( + "-> try_clear_storage; nothing exists at generation={:?}", + gen + ); + return false; + } + }; + + test_println!("-> try_clear_storage; marked!"); + + if should_clear { + // We're allowed to remove the slot now! + test_println!("-> try_remove_value; can clear now"); + return self.clear_storage(gen, offset, free); + } + + true + } + + /// Clear this slot's storage + /// + /// This method blocks until all references have been dropped and clears the storage. + pub(super) fn clear_storage<F: FreeList<C>>( + &self, + gen: Generation<C>, + offset: usize, + free: &F, + ) -> bool { + // release_with will _always_ wait unitl it can release the slot or just return if the slot + // has already been released. + self.release_with(gen, offset, free, |item| { + let cleared = item.map(|inner| Clear::clear(inner)).is_some(); + test_println!("-> cleared: {}", cleared); + cleared + }) + } +} + +impl<T, C: cfg::Config> Slot<T, C> { + fn release(&self) -> bool { + let mut lifecycle = self.lifecycle.load(Ordering::Acquire); + loop { + let refs = RefCount::<C>::from_packed(lifecycle); + let state = Lifecycle::<C>::from_packed(lifecycle).state; + let gen = LifecycleGen::<C>::from_packed(lifecycle).0; + + // Are we the last guard, and is the slot marked for removal? + let dropping = refs.value == 1 && state == State::Marked; + let new_lifecycle = if dropping { + // If so, we want to advance the state to "removing" + gen.pack(State::Removing as usize) + } else { + // Otherwise, just subtract 1 from the ref count. + refs.decr().pack(lifecycle) + }; + + test_println!( + "-> drop guard: state={:?}; gen={:?}; refs={:?}; lifecycle={:#x}; new_lifecycle={:#x}; dropping={:?}", + state, + gen, + refs, + lifecycle, + new_lifecycle, + dropping + ); + match self.lifecycle.compare_exchange( + lifecycle, + new_lifecycle, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + test_println!("-> drop guard: done; dropping={:?}", dropping); + return dropping; + } + Err(actual) => { + test_println!("-> drop guard; retry, actual={:#x}", actual); + lifecycle = actual; + } + } + } + } +} + +impl<T, C: cfg::Config> fmt::Debug for Slot<T, C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let lifecycle = self.lifecycle.load(Ordering::Relaxed); + f.debug_struct("Slot") + .field("lifecycle", &format_args!("{:#x}", lifecycle)) + .field("state", &Lifecycle::<C>::from_packed(lifecycle).state) + .field("gen", &LifecycleGen::<C>::from_packed(lifecycle).0) + .field("refs", &RefCount::<C>::from_packed(lifecycle)) + .field("next", &self.next()) + .finish() + } +} + +// === impl Generation === + +impl<C> fmt::Debug for Generation<C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Generation").field(&self.value).finish() + } +} + +impl<C: cfg::Config> Generation<C> { + fn advance(self) -> Self { + Self::from_usize((self.value + 1) % Self::BITS) + } +} + +impl<C: cfg::Config> PartialEq for Generation<C> { + fn eq(&self, other: &Self) -> bool { + self.value == other.value + } +} + +impl<C: cfg::Config> Eq for Generation<C> {} + +impl<C: cfg::Config> PartialOrd for Generation<C> { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + self.value.partial_cmp(&other.value) + } +} + +impl<C: cfg::Config> Ord for Generation<C> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.value.cmp(&other.value) + } +} + +impl<C: cfg::Config> Clone for Generation<C> { + fn clone(&self) -> Self { + Self::new(self.value) + } +} + +impl<C: cfg::Config> Copy for Generation<C> {} + +// === impl Guard === + +impl<T, C: cfg::Config> Guard<T, C> { + /// Releases the guard, returning `true` if the slot should be cleared. + /// + /// ## Safety + /// + /// This dereferences a raw pointer to the slot. The caller is responsible + /// for ensuring that the `Guard` does not outlive the slab that contains + /// the pointed slot. Failure to do so means this pointer may dangle. + #[inline] + pub(crate) unsafe fn release(&self) -> bool { + self.slot().release() + } + + /// Returns a borrowed reference to the slot. + /// + /// ## Safety + /// + /// This dereferences a raw pointer to the slot. The caller is responsible + /// for ensuring that the `Guard` does not outlive the slab that contains + /// the pointed slot. Failure to do so means this pointer may dangle. + #[inline] + pub(crate) unsafe fn slot(&self) -> &Slot<T, C> { + self.slot.as_ref() + } + + /// Returns a borrowed reference to the slot's value. + /// + /// ## Safety + /// + /// This dereferences a raw pointer to the slot. The caller is responsible + /// for ensuring that the `Guard` does not outlive the slab that contains + /// the pointed slot. Failure to do so means this pointer may dangle. + #[inline(always)] + pub(crate) unsafe fn value(&self) -> &T { + self.slot().item.with(|item| &*item) + } +} + +// === impl Lifecycle === + +impl<C: cfg::Config> Lifecycle<C> { + const MARKED: Self = Self { + state: State::Marked, + _cfg: PhantomData, + }; + const REMOVING: Self = Self { + state: State::Removing, + _cfg: PhantomData, + }; + const PRESENT: Self = Self { + state: State::Present, + _cfg: PhantomData, + }; +} + +impl<C: cfg::Config> Pack<C> for Lifecycle<C> { + const LEN: usize = 2; + type Prev = (); + + fn from_usize(u: usize) -> Self { + Self { + state: match u & Self::MASK { + 0b00 => State::Present, + 0b01 => State::Marked, + 0b11 => State::Removing, + bad => unreachable!("weird lifecycle {:#b}", bad), + }, + _cfg: PhantomData, + } + } + + fn as_usize(&self) -> usize { + self.state as usize + } +} + +impl<C> PartialEq for Lifecycle<C> { + fn eq(&self, other: &Self) -> bool { + self.state == other.state + } +} + +impl<C> Eq for Lifecycle<C> {} + +impl<C> fmt::Debug for Lifecycle<C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Lifecycle").field(&self.state).finish() + } +} + +// === impl RefCount === + +impl<C: cfg::Config> Pack<C> for RefCount<C> { + const LEN: usize = cfg::WIDTH - (Lifecycle::<C>::LEN + Generation::<C>::LEN); + type Prev = Lifecycle<C>; + + fn from_usize(value: usize) -> Self { + debug_assert!(value <= Self::BITS); + Self { + value, + _cfg: PhantomData, + } + } + + fn as_usize(&self) -> usize { + self.value + } +} + +impl<C: cfg::Config> RefCount<C> { + pub(crate) const MAX: usize = Self::BITS - 1; + + #[inline] + fn incr(self) -> Option<Self> { + if self.value >= Self::MAX { + test_println!("-> get: {}; MAX={}", self.value, RefCount::<C>::MAX); + return None; + } + + Some(Self::from_usize(self.value + 1)) + } + + #[inline] + fn decr(self) -> Self { + Self::from_usize(self.value - 1) + } +} + +impl<C> fmt::Debug for RefCount<C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("RefCount").field(&self.value).finish() + } +} + +impl<C: cfg::Config> PartialEq for RefCount<C> { + fn eq(&self, other: &Self) -> bool { + self.value == other.value + } +} + +impl<C: cfg::Config> Eq for RefCount<C> {} + +impl<C: cfg::Config> PartialOrd for RefCount<C> { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + self.value.partial_cmp(&other.value) + } +} + +impl<C: cfg::Config> Ord for RefCount<C> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.value.cmp(&other.value) + } +} + +impl<C: cfg::Config> Clone for RefCount<C> { + fn clone(&self) -> Self { + Self::from_usize(self.value) + } +} + +impl<C: cfg::Config> Copy for RefCount<C> {} + +// === impl LifecycleGen === + +impl<C: cfg::Config> Pack<C> for LifecycleGen<C> { + const LEN: usize = Generation::<C>::LEN; + type Prev = RefCount<C>; + + fn from_usize(value: usize) -> Self { + Self(Generation::from_usize(value)) + } + + fn as_usize(&self) -> usize { + self.0.as_usize() + } +} + +impl<T, C: cfg::Config> InitGuard<T, C> { + pub(crate) fn generation(&self) -> Generation<C> { + LifecycleGen::<C>::from_packed(self.curr_lifecycle).0 + } + + /// Returns a borrowed reference to the slot's value. + /// + /// ## Safety + /// + /// This dereferences a raw pointer to the slot. The caller is responsible + /// for ensuring that the `InitGuard` does not outlive the slab that + /// contains the pointed slot. Failure to do so means this pointer may + /// dangle. + pub(crate) unsafe fn value(&self) -> &T { + self.slot.as_ref().item.with(|val| &*val) + } + + /// Returns a mutably borrowed reference to the slot's value. + /// + /// ## Safety + /// + /// This dereferences a raw pointer to the slot. The caller is responsible + /// for ensuring that the `InitGuard` does not outlive the slab that + /// contains the pointed slot. Failure to do so means this pointer may + /// dangle. + /// + /// It's safe to reference the slot mutably, though, because creating an + /// `InitGuard` ensures there are no outstanding immutable references. + pub(crate) unsafe fn value_mut(&mut self) -> &mut T { + self.slot.as_ref().item.with_mut(|val| &mut *val) + } + + /// Releases the guard, returning `true` if the slot should be cleared. + /// + /// ## Safety + /// + /// This dereferences a raw pointer to the slot. The caller is responsible + /// for ensuring that the `InitGuard` does not outlive the slab that + /// contains the pointed slot. Failure to do so means this pointer may + /// dangle. + pub(crate) unsafe fn release(&mut self) -> bool { + self.release2(0) + } + + /// Downgrades the guard to an immutable guard + /// + /// ## Safety + /// + /// This dereferences a raw pointer to the slot. The caller is responsible + /// for ensuring that the `InitGuard` does not outlive the slab that + /// contains the pointed slot. Failure to do so means this pointer may + /// dangle. + pub(crate) unsafe fn downgrade(&mut self) -> Guard<T, C> { + let _ = self.release2(RefCount::<C>::from_usize(1).pack(0)); + Guard { slot: self.slot } + } + + unsafe fn release2(&mut self, new_refs: usize) -> bool { + test_println!( + "InitGuard::release; curr_lifecycle={:?}; downgrading={}", + Lifecycle::<C>::from_packed(self.curr_lifecycle), + new_refs != 0, + ); + if self.released { + test_println!("-> already released!"); + return false; + } + self.released = true; + let mut curr_lifecycle = self.curr_lifecycle; + let slot = self.slot.as_ref(); + let new_lifecycle = LifecycleGen::<C>::from_packed(self.curr_lifecycle) + .pack(Lifecycle::<C>::PRESENT.pack(new_refs)); + + match slot.lifecycle.compare_exchange( + curr_lifecycle, + new_lifecycle, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + test_println!("--> advanced to PRESENT; done"); + return false; + } + Err(actual) => { + test_println!( + "--> lifecycle changed; actual={:?}", + Lifecycle::<C>::from_packed(actual) + ); + curr_lifecycle = actual; + } + } + + // if the state was no longer the prior state, we are now responsible + // for releasing the slot. + loop { + let refs = RefCount::<C>::from_packed(curr_lifecycle); + let state = Lifecycle::<C>::from_packed(curr_lifecycle).state; + + test_println!( + "-> InitGuard::release; lifecycle={:#x}; state={:?}; refs={:?};", + curr_lifecycle, + state, + refs, + ); + + debug_assert!(state == State::Marked || thread::panicking(), "state was not MARKED; someone else has removed the slot while we have exclusive access!\nactual={:?}", state); + debug_assert!(refs.value == 0 || thread::panicking(), "ref count was not 0; someone else has referenced the slot while we have exclusive access!\nactual={:?}", refs); + let new_lifecycle = self.generation().pack(State::Removing as usize); + + match slot.lifecycle.compare_exchange( + curr_lifecycle, + new_lifecycle, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + test_println!("-> InitGuard::RELEASE: done!"); + return true; + } + Err(actual) => { + debug_assert!(thread::panicking(), "we should not have to retry this CAS!"); + test_println!("-> InitGuard::release; retry, actual={:#x}", actual); + curr_lifecycle = actual; + } + } + } + } +} + +// === helpers === + +#[inline(always)] +fn exponential_backoff(exp: &mut usize) { + /// Maximum exponent we can back off to. + const MAX_EXPONENT: usize = 8; + + // Issue 2^exp pause instructions. + for _ in 0..(1 << *exp) { + hint::spin_loop(); + } + + if *exp >= MAX_EXPONENT { + // If we have reached the max backoff, also yield to the scheduler + // explicitly. + crate::sync::yield_now(); + } else { + // Otherwise, increment the exponent. + *exp += 1; + } +} diff --git a/vendor/sharded-slab/src/page/stack.rs b/vendor/sharded-slab/src/page/stack.rs new file mode 100644 index 000000000..e28d9b1a7 --- /dev/null +++ b/vendor/sharded-slab/src/page/stack.rs @@ -0,0 +1,124 @@ +use crate::cfg; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use std::{fmt, marker::PhantomData}; + +pub(super) struct TransferStack<C = cfg::DefaultConfig> { + head: AtomicUsize, + _cfg: PhantomData<fn(C)>, +} + +impl<C: cfg::Config> TransferStack<C> { + pub(super) fn new() -> Self { + Self { + head: AtomicUsize::new(super::Addr::<C>::NULL), + _cfg: PhantomData, + } + } + + pub(super) fn pop_all(&self) -> Option<usize> { + let val = self.head.swap(super::Addr::<C>::NULL, Ordering::Acquire); + test_println!("-> pop {:#x}", val); + if val == super::Addr::<C>::NULL { + None + } else { + Some(val) + } + } + + fn push(&self, new_head: usize, before: impl Fn(usize)) { + // We loop to win the race to set the new head. The `next` variable + // is the next slot on the stack which needs to be pointed to by the + // new head. + let mut next = self.head.load(Ordering::Relaxed); + loop { + test_println!("-> next {:#x}", next); + before(next); + + match self + .head + .compare_exchange(next, new_head, Ordering::Release, Ordering::Relaxed) + { + // lost the race! + Err(actual) => { + test_println!("-> retry!"); + next = actual; + } + Ok(_) => { + test_println!("-> successful; next={:#x}", next); + return; + } + } + } + } +} + +impl<C: cfg::Config> super::FreeList<C> for TransferStack<C> { + fn push<T>(&self, new_head: usize, slot: &super::Slot<T, C>) { + self.push(new_head, |next| slot.set_next(next)) + } +} + +impl<C> fmt::Debug for TransferStack<C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TransferStack") + .field( + "head", + &format_args!("{:#0x}", &self.head.load(Ordering::Relaxed)), + ) + .finish() + } +} + +#[cfg(all(loom, test))] +mod test { + use super::*; + use crate::{sync::UnsafeCell, test_util}; + use loom::thread; + use std::sync::Arc; + + #[test] + fn transfer_stack() { + test_util::run_model("transfer_stack", || { + let causalities = [UnsafeCell::new(999), UnsafeCell::new(999)]; + let shared = Arc::new((causalities, TransferStack::<cfg::DefaultConfig>::new())); + let shared1 = shared.clone(); + let shared2 = shared.clone(); + + let t1 = thread::spawn(move || { + let (causalities, stack) = &*shared1; + stack.push(0, |prev| { + causalities[0].with_mut(|c| unsafe { + *c = 0; + }); + test_println!("prev={:#x}", prev) + }); + }); + let t2 = thread::spawn(move || { + let (causalities, stack) = &*shared2; + stack.push(1, |prev| { + causalities[1].with_mut(|c| unsafe { + *c = 1; + }); + test_println!("prev={:#x}", prev) + }); + }); + + let (causalities, stack) = &*shared; + let mut idx = stack.pop_all(); + while idx == None { + idx = stack.pop_all(); + thread::yield_now(); + } + let idx = idx.unwrap(); + causalities[idx].with(|val| unsafe { + assert_eq!( + *val, idx, + "UnsafeCell write must happen-before index is pushed to the stack!" + ); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + }); + } +} diff --git a/vendor/sharded-slab/src/pool.rs b/vendor/sharded-slab/src/pool.rs new file mode 100644 index 000000000..115e36eef --- /dev/null +++ b/vendor/sharded-slab/src/pool.rs @@ -0,0 +1,1342 @@ +//! A lock-free concurrent object pool. +//! +//! See the [`Pool` type's documentation][pool] for details on the object pool API and how +//! it differs from the [`Slab`] API. +//! +//! [pool]: ../struct.Pool.html +//! [`Slab`]: ../struct.Slab.html +use crate::{ + cfg::{self, CfgPrivate, DefaultConfig}, + clear::Clear, + page, shard, + tid::Tid, + Pack, Shard, +}; + +use std::{fmt, marker::PhantomData, sync::Arc}; + +/// A lock-free concurrent object pool. +/// +/// Slabs provide pre-allocated storage for many instances of a single type. But, when working with +/// heap allocated objects, the advantages of a slab are lost, as the memory allocated for the +/// object is freed when the object is removed from the slab. With a pool, we can instead reuse +/// this memory for objects being added to the pool in the future, therefore reducing memory +/// fragmentation and avoiding additional allocations. +/// +/// This type implements a lock-free concurrent pool, indexed by `usize`s. The items stored in this +/// type need to implement [`Clear`] and `Default`. +/// +/// The `Pool` type shares similar semantics to [`Slab`] when it comes to sharing across threads +/// and storing mutable shared data. The biggest difference is there are no [`Slab::insert`] and +/// [`Slab::take`] analouges for the `Pool` type. Instead new items are added to the pool by using +/// the [`Pool::create`] method, and marked for clearing by the [`Pool::clear`] method. +/// +/// # Examples +/// +/// Add an entry to the pool, returning an index: +/// ``` +/// # use sharded_slab::Pool; +/// let pool: Pool<String> = Pool::new(); +/// +/// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); +/// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); +/// ``` +/// +/// Create a new pooled item, returning a guard that allows mutable access: +/// ``` +/// # use sharded_slab::Pool; +/// let pool: Pool<String> = Pool::new(); +/// +/// let mut guard = pool.create().unwrap(); +/// let key = guard.key(); +/// guard.push_str("hello world"); +/// +/// drop(guard); // release the guard, allowing immutable access. +/// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); +/// ``` +/// +/// Pool entries can be cleared by calling [`Pool::clear`]. This marks the entry to +/// be cleared when the guards referencing to it are dropped. +/// ``` +/// # use sharded_slab::Pool; +/// let pool: Pool<String> = Pool::new(); +/// +/// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); +/// +/// // Mark this entry to be cleared. +/// pool.clear(key); +/// +/// // The cleared entry is no longer available in the pool +/// assert!(pool.get(key).is_none()); +/// ``` +/// # Configuration +/// +/// Both `Pool` and [`Slab`] share the same configuration mechanism. See [crate level documentation][config-doc] +/// for more details. +/// +/// [`Slab::take`]: crate::Slab::take +/// [`Slab::insert`]: crate::Slab::insert +/// [`Pool::create`]: Pool::create +/// [`Pool::clear`]: Pool::clear +/// [config-doc]: crate#configuration +/// [`Clear`]: crate::Clear +/// [`Slab`]: crate::Slab +pub struct Pool<T, C = DefaultConfig> +where + T: Clear + Default, + C: cfg::Config, +{ + shards: shard::Array<T, C>, + _cfg: PhantomData<C>, +} + +/// A guard that allows access to an object in a pool. +/// +/// While the guard exists, it indicates to the pool that the item the guard references is +/// currently being accessed. If the item is removed from the pool while the guard exists, the +/// removal will be deferred until all guards are dropped. +pub struct Ref<'a, T, C = DefaultConfig> +where + T: Clear + Default, + C: cfg::Config, +{ + inner: page::slot::Guard<T, C>, + shard: &'a Shard<T, C>, + key: usize, +} + +/// A guard that allows exclusive mutable access to an object in a pool. +/// +/// While the guard exists, it indicates to the pool that the item the guard +/// references is currently being accessed. If the item is removed from the pool +/// while a guard exists, the removal will be deferred until the guard is +/// dropped. The slot cannot be accessed by other threads while it is accessed +/// mutably. +pub struct RefMut<'a, T, C = DefaultConfig> +where + T: Clear + Default, + C: cfg::Config, +{ + inner: page::slot::InitGuard<T, C>, + shard: &'a Shard<T, C>, + key: usize, +} + +/// An owned guard that allows shared immutable access to an object in a pool. +/// +/// While the guard exists, it indicates to the pool that the item the guard references is +/// currently being accessed. If the item is removed from the pool while the guard exists, the +/// removal will be deferred until all guards are dropped. +/// +/// Unlike [`Ref`], which borrows the pool, an `OwnedRef` clones the `Arc` +/// around the pool. Therefore, it keeps the pool from being dropped until all +/// such guards have been dropped. This means that an `OwnedRef` may be held for +/// an arbitrary lifetime. +/// +/// +/// # Examples +/// +/// ``` +/// # use sharded_slab::Pool; +/// use std::sync::Arc; +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); +/// +/// // Look up the created `Key`, returning an `OwnedRef`. +/// let value = pool.clone().get_owned(key).unwrap(); +/// +/// // Now, the original `Arc` clone of the pool may be dropped, but the +/// // returned `OwnedRef` can still access the value. +/// assert_eq!(value, String::from("hello world")); +/// ``` +/// +/// Unlike [`Ref`], an `OwnedRef` may be stored in a struct which must live +/// for the `'static` lifetime: +/// +/// ``` +/// # use sharded_slab::Pool; +/// use sharded_slab::pool::OwnedRef; +/// use std::sync::Arc; +/// +/// pub struct MyStruct { +/// pool_ref: OwnedRef<String>, +/// // ... other fields ... +/// } +/// +/// // Suppose this is some arbitrary function which requires a value that +/// // lives for the 'static lifetime... +/// fn function_requiring_static<T: 'static>(t: &T) { +/// // ... do something extremely important and interesting ... +/// } +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); +/// +/// // Look up the created `Key`, returning an `OwnedRef`. +/// let pool_ref = pool.clone().get_owned(key).unwrap(); +/// let my_struct = MyStruct { +/// pool_ref, +/// // ... +/// }; +/// +/// // We can use `my_struct` anywhere where it is required to have the +/// // `'static` lifetime: +/// function_requiring_static(&my_struct); +/// ``` +/// +/// `OwnedRef`s may be sent between threads: +/// +/// ``` +/// # use sharded_slab::Pool; +/// use std::{thread, sync::Arc}; +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); +/// +/// // Look up the created `Key`, returning an `OwnedRef`. +/// let value = pool.clone().get_owned(key).unwrap(); +/// +/// thread::spawn(move || { +/// assert_eq!(value, String::from("hello world")); +/// // ... +/// }).join().unwrap(); +/// ``` +/// +/// [`Ref`]: crate::pool::Ref +pub struct OwnedRef<T, C = DefaultConfig> +where + T: Clear + Default, + C: cfg::Config, +{ + inner: page::slot::Guard<T, C>, + pool: Arc<Pool<T, C>>, + key: usize, +} + +/// An owned guard that allows exclusive, mutable access to an object in a pool. +/// +/// An `OwnedRefMut<T>` functions more or less identically to an owned +/// `Box<T>`: it can be passed to functions, stored in structure fields, and +/// borrowed mutably or immutably, and can be owned for arbitrary lifetimes. +/// The difference is that, unlike a `Box<T>`, the memory allocation for the +/// `T` lives in the `Pool`; when an `OwnedRefMut` is created, it may reuse +/// memory that was allocated for a previous pooled object that has been +/// cleared. Additionally, the `OwnedRefMut` may be [downgraded] to an +/// [`OwnedRef`] which may be shared freely, essentially turning the `Box` +/// into an `Arc`. +/// +/// This is returned by [`Pool::create_owned`]. +/// +/// While the guard exists, it indicates to the pool that the item the guard +/// references is currently being accessed. If the item is removed from the pool +/// while the guard exists, theremoval will be deferred until all guards are +/// dropped. +/// +/// Unlike [`RefMut`], which borrows the pool, an `OwnedRefMut` clones the `Arc` +/// around the pool. Therefore, it keeps the pool from being dropped until all +/// such guards have been dropped. This means that an `OwnedRefMut` may be held for +/// an arbitrary lifetime. +/// +/// # Examples +/// +/// ```rust +/// # use sharded_slab::Pool; +/// # use std::thread; +/// use std::sync::Arc; +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// +/// // Create a new pooled item, returning an owned guard that allows mutable +/// // access to the new item. +/// let mut item = pool.clone().create_owned().unwrap(); +/// // Return a key that allows indexing the created item once the guard +/// // has been dropped. +/// let key = item.key(); +/// +/// // Mutate the item. +/// item.push_str("Hello"); +/// // Drop the guard, releasing mutable access to the new item. +/// drop(item); +/// +/// /// Other threads may now (immutably) access the item using the returned key. +/// thread::spawn(move || { +/// assert_eq!(pool.get(key).unwrap(), String::from("Hello")); +/// }).join().unwrap(); +/// ``` +/// +/// ```rust +/// # use sharded_slab::Pool; +/// use std::sync::Arc; +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// +/// // Create a new item, returning an owned, mutable guard. +/// let mut value = pool.clone().create_owned().unwrap(); +/// +/// // Now, the original `Arc` clone of the pool may be dropped, but the +/// // returned `OwnedRefMut` can still access the value. +/// drop(pool); +/// +/// value.push_str("hello world"); +/// assert_eq!(value, String::from("hello world")); +/// ``` +/// +/// Unlike [`RefMut`], an `OwnedRefMut` may be stored in a struct which must live +/// for the `'static` lifetime: +/// +/// ``` +/// # use sharded_slab::Pool; +/// use sharded_slab::pool::OwnedRefMut; +/// use std::sync::Arc; +/// +/// pub struct MyStruct { +/// pool_ref: OwnedRefMut<String>, +/// // ... other fields ... +/// } +/// +/// // Suppose this is some arbitrary function which requires a value that +/// // lives for the 'static lifetime... +/// fn function_requiring_static<T: 'static>(t: &T) { +/// // ... do something extremely important and interesting ... +/// } +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// +/// // Create a new item, returning a mutable owned reference. +/// let pool_ref = pool.clone().create_owned().unwrap(); +/// +/// let my_struct = MyStruct { +/// pool_ref, +/// // ... +/// }; +/// +/// // We can use `my_struct` anywhere where it is required to have the +/// // `'static` lifetime: +/// function_requiring_static(&my_struct); +/// ``` +/// +/// `OwnedRefMut`s may be sent between threads: +/// +/// ``` +/// # use sharded_slab::Pool; +/// use std::{thread, sync::Arc}; +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// +/// let mut value = pool.clone().create_owned().unwrap(); +/// let key = value.key(); +/// +/// thread::spawn(move || { +/// value.push_str("hello world"); +/// // ... +/// }).join().unwrap(); +/// +/// // Once the `OwnedRefMut` has been dropped by the other thread, we may +/// // now access the value immutably on this thread. +/// +/// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); +/// ``` +/// +/// Downgrading from a mutable to an immutable reference: +/// +/// ``` +/// # use sharded_slab::Pool; +/// use std::{thread, sync::Arc}; +/// +/// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); +/// +/// let mut value = pool.clone().create_owned().unwrap(); +/// let key = value.key(); +/// value.push_str("hello world"); +/// +/// // Downgrade the mutable owned ref to an immutable owned ref. +/// let value = value.downgrade(); +/// +/// // Once the `OwnedRefMut` has been downgraded, other threads may +/// // immutably access the pooled value: +/// thread::spawn(move || { +/// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); +/// }).join().unwrap(); +/// +/// // This thread can still access the pooled value through the +/// // immutable owned ref: +/// assert_eq!(value, String::from("hello world")); +/// ``` +/// +/// [`Pool::create_owned`]: crate::Pool::create_owned +/// [`RefMut`]: crate::pool::RefMut +/// [`OwnedRefMut`]: crate::pool::OwnedRefMut +/// [downgraded]: crate::pool::OwnedRefMut::downgrade +pub struct OwnedRefMut<T, C = DefaultConfig> +where + T: Clear + Default, + C: cfg::Config, +{ + inner: page::slot::InitGuard<T, C>, + pool: Arc<Pool<T, C>>, + key: usize, +} + +impl<T> Pool<T> +where + T: Clear + Default, +{ + /// Returns a new `Pool` with the default configuration parameters. + pub fn new() -> Self { + Self::new_with_config() + } + + /// Returns a new `Pool` with the provided configuration parameters. + pub fn new_with_config<C: cfg::Config>() -> Pool<T, C> { + C::validate(); + Pool { + shards: shard::Array::new(), + _cfg: PhantomData, + } + } +} + +impl<T, C> Pool<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + /// The number of bits in each index which are used by the pool. + /// + /// If other data is packed into the `usize` indices returned by + /// [`Pool::create`], user code is free to use any bits higher than the + /// `USED_BITS`-th bit freely. + /// + /// This is determined by the [`Config`] type that configures the pool's + /// parameters. By default, all bits are used; this can be changed by + /// overriding the [`Config::RESERVED_BITS`][res] constant. + /// + /// [`Config`]: trait.Config.html + /// [res]: trait.Config.html#associatedconstant.RESERVED_BITS + /// [`Slab::insert`]: struct.Slab.html#method.insert + pub const USED_BITS: usize = C::USED_BITS; + + /// Creates a new object in the pool, returning an [`RefMut`] guard that + /// may be used to mutate the new object. + /// + /// If this function returns `None`, then the shard for the current thread is full and no items + /// can be added until some are removed, or the maximum number of shards has been reached. + /// + /// # Examples + /// ```rust + /// # use sharded_slab::Pool; + /// # use std::thread; + /// let pool: Pool<String> = Pool::new(); + /// + /// // Create a new pooled item, returning a guard that allows mutable + /// // access to the new item. + /// let mut item = pool.create().unwrap(); + /// // Return a key that allows indexing the created item once the guard + /// // has been dropped. + /// let key = item.key(); + /// + /// // Mutate the item. + /// item.push_str("Hello"); + /// // Drop the guard, releasing mutable access to the new item. + /// drop(item); + /// + /// /// Other threads may now (immutably) access the item using the returned key. + /// thread::spawn(move || { + /// assert_eq!(pool.get(key).unwrap(), String::from("Hello")); + /// }).join().unwrap(); + /// ``` + /// + /// [`RefMut`]: crate::pool::RefMut + pub fn create(&self) -> Option<RefMut<'_, T, C>> { + let (tid, shard) = self.shards.current(); + test_println!("pool: create {:?}", tid); + let (key, inner) = shard.init_with(|idx, slot| { + let guard = slot.init()?; + let gen = guard.generation(); + Some((gen.pack(idx), guard)) + })?; + Some(RefMut { + inner, + key: tid.pack(key), + shard, + }) + } + + /// Creates a new object in the pool, returning an [`OwnedRefMut`] guard that + /// may be used to mutate the new object. + /// + /// If this function returns `None`, then the shard for the current thread + /// is full and no items can be added until some are removed, or the maximum + /// number of shards has been reached. + /// + /// Unlike [`create`], which borrows the pool, this method _clones_ the `Arc` + /// around the pool if a value exists for the given key. This means that the + /// returned [`OwnedRefMut`] can be held for an arbitrary lifetime. However, + /// this method requires that the pool itself be wrapped in an `Arc`. + /// + /// An `OwnedRefMut<T>` functions more or less identically to an owned + /// `Box<T>`: it can be passed to functions, stored in structure fields, and + /// borrowed mutably or immutably, and can be owned for arbitrary lifetimes. + /// The difference is that, unlike a `Box<T>`, the memory allocation for the + /// `T` lives in the `Pool`; when an `OwnedRefMut` is created, it may reuse + /// memory that was allocated for a previous pooled object that has been + /// cleared. Additionally, the `OwnedRefMut` may be [downgraded] to an + /// [`OwnedRef`] which may be shared freely, essentially turning the `Box` + /// into an `Arc`. + /// + /// # Examples + /// + /// ```rust + /// # use sharded_slab::Pool; + /// # use std::thread; + /// use std::sync::Arc; + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// + /// // Create a new pooled item, returning an owned guard that allows mutable + /// // access to the new item. + /// let mut item = pool.clone().create_owned().unwrap(); + /// // Return a key that allows indexing the created item once the guard + /// // has been dropped. + /// let key = item.key(); + /// + /// // Mutate the item. + /// item.push_str("Hello"); + /// // Drop the guard, releasing mutable access to the new item. + /// drop(item); + /// + /// /// Other threads may now (immutably) access the item using the returned key. + /// thread::spawn(move || { + /// assert_eq!(pool.get(key).unwrap(), String::from("Hello")); + /// }).join().unwrap(); + /// ``` + /// + /// ```rust + /// # use sharded_slab::Pool; + /// use std::sync::Arc; + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// + /// // Create a new item, returning an owned, mutable guard. + /// let mut value = pool.clone().create_owned().unwrap(); + /// + /// // Now, the original `Arc` clone of the pool may be dropped, but the + /// // returned `OwnedRefMut` can still access the value. + /// drop(pool); + /// + /// value.push_str("hello world"); + /// assert_eq!(value, String::from("hello world")); + /// ``` + /// + /// Unlike [`RefMut`], an `OwnedRefMut` may be stored in a struct which must live + /// for the `'static` lifetime: + /// + /// ``` + /// # use sharded_slab::Pool; + /// use sharded_slab::pool::OwnedRefMut; + /// use std::sync::Arc; + /// + /// pub struct MyStruct { + /// pool_ref: OwnedRefMut<String>, + /// // ... other fields ... + /// } + /// + /// // Suppose this is some arbitrary function which requires a value that + /// // lives for the 'static lifetime... + /// fn function_requiring_static<T: 'static>(t: &T) { + /// // ... do something extremely important and interesting ... + /// } + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// + /// // Create a new item, returning a mutable owned reference. + /// let pool_ref = pool.clone().create_owned().unwrap(); + /// + /// let my_struct = MyStruct { + /// pool_ref, + /// // ... + /// }; + /// + /// // We can use `my_struct` anywhere where it is required to have the + /// // `'static` lifetime: + /// function_requiring_static(&my_struct); + /// ``` + /// + /// `OwnedRefMut`s may be sent between threads: + /// + /// ``` + /// # use sharded_slab::Pool; + /// use std::{thread, sync::Arc}; + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// + /// let mut value = pool.clone().create_owned().unwrap(); + /// let key = value.key(); + /// + /// thread::spawn(move || { + /// value.push_str("hello world"); + /// // ... + /// }).join().unwrap(); + /// + /// // Once the `OwnedRefMut` has been dropped by the other thread, we may + /// // now access the value immutably on this thread. + /// + /// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); + /// ``` + /// + /// Downgrading from a mutable to an immutable reference: + /// + /// ``` + /// # use sharded_slab::Pool; + /// use std::{thread, sync::Arc}; + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// + /// let mut value = pool.clone().create_owned().unwrap(); + /// let key = value.key(); + /// value.push_str("hello world"); + /// + /// // Downgrade the mutable owned ref to an immutable owned ref. + /// let value = value.downgrade(); + /// + /// // Once the `OwnedRefMut` has been downgraded, other threads may + /// // immutably access the pooled value: + /// thread::spawn(move || { + /// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); + /// }).join().unwrap(); + /// + /// // This thread can still access the pooled value through the + /// // immutable owned ref: + /// assert_eq!(value, String::from("hello world")); + /// ``` + /// + /// [`create`]: Pool::create + /// [`OwnedRef`]: crate::pool::OwnedRef + /// [`RefMut`]: crate::pool::RefMut + /// [`OwnedRefMut`]: crate::pool::OwnedRefMut + /// [downgraded]: crate::pool::OwnedRefMut::downgrade + pub fn create_owned(self: Arc<Self>) -> Option<OwnedRefMut<T, C>> { + let (tid, shard) = self.shards.current(); + test_println!("pool: create_owned {:?}", tid); + let (inner, key) = shard.init_with(|idx, slot| { + let inner = slot.init()?; + let gen = inner.generation(); + Some((inner, tid.pack(gen.pack(idx)))) + })?; + Some(OwnedRefMut { + inner, + pool: self, + key, + }) + } + + /// Creates a new object in the pool with the provided initializer, + /// returning a key that may be used to access the new object. + /// + /// If this function returns `None`, then the shard for the current thread is full and no items + /// can be added until some are removed, or the maximum number of shards has been reached. + /// + /// # Examples + /// ```rust + /// # use sharded_slab::Pool; + /// # use std::thread; + /// let pool: Pool<String> = Pool::new(); + /// + /// // Create a new pooled item, returning its integer key. + /// let key = pool.create_with(|s| s.push_str("Hello")).unwrap(); + /// + /// /// Other threads may now (immutably) access the item using the key. + /// thread::spawn(move || { + /// assert_eq!(pool.get(key).unwrap(), String::from("Hello")); + /// }).join().unwrap(); + /// ``` + pub fn create_with(&self, init: impl FnOnce(&mut T)) -> Option<usize> { + test_println!("pool: create_with"); + let mut guard = self.create()?; + init(&mut guard); + Some(guard.key()) + } + + /// Return a borrowed reference to the value associated with the given key. + /// + /// If the pool does not contain a value for the given key, `None` is returned instead. + /// + /// # Examples + /// + /// ```rust + /// # use sharded_slab::Pool; + /// let pool: Pool<String> = Pool::new(); + /// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); + /// + /// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); + /// assert!(pool.get(12345).is_none()); + /// ``` + pub fn get(&self, key: usize) -> Option<Ref<'_, T, C>> { + let tid = C::unpack_tid(key); + + test_println!("pool: get{:?}; current={:?}", tid, Tid::<C>::current()); + let shard = self.shards.get(tid.as_usize())?; + let inner = shard.with_slot(key, |slot| slot.get(C::unpack_gen(key)))?; + Some(Ref { inner, shard, key }) + } + + /// Return an owned reference to the value associated with the given key. + /// + /// If the pool does not contain a value for the given key, `None` is + /// returned instead. + /// + /// Unlike [`get`], which borrows the pool, this method _clones_ the `Arc` + /// around the pool if a value exists for the given key. This means that the + /// returned [`OwnedRef`] can be held for an arbitrary lifetime. However, + /// this method requires that the pool itself be wrapped in an `Arc`. + /// + /// # Examples + /// + /// ```rust + /// # use sharded_slab::Pool; + /// use std::sync::Arc; + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); + /// + /// // Look up the created `Key`, returning an `OwnedRef`. + /// let value = pool.clone().get_owned(key).unwrap(); + /// + /// // Now, the original `Arc` clone of the pool may be dropped, but the + /// // returned `OwnedRef` can still access the value. + /// assert_eq!(value, String::from("hello world")); + /// ``` + /// + /// Unlike [`Ref`], an `OwnedRef` may be stored in a struct which must live + /// for the `'static` lifetime: + /// + /// ``` + /// # use sharded_slab::Pool; + /// use sharded_slab::pool::OwnedRef; + /// use std::sync::Arc; + /// + /// pub struct MyStruct { + /// pool_ref: OwnedRef<String>, + /// // ... other fields ... + /// } + /// + /// // Suppose this is some arbitrary function which requires a value that + /// // lives for the 'static lifetime... + /// fn function_requiring_static<T: 'static>(t: &T) { + /// // ... do something extremely important and interesting ... + /// } + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); + /// + /// // Look up the created `Key`, returning an `OwnedRef`. + /// let pool_ref = pool.clone().get_owned(key).unwrap(); + /// let my_struct = MyStruct { + /// pool_ref, + /// // ... + /// }; + /// + /// // We can use `my_struct` anywhere where it is required to have the + /// // `'static` lifetime: + /// function_requiring_static(&my_struct); + /// ``` + /// + /// `OwnedRef`s may be sent between threads: + /// + /// ``` + /// # use sharded_slab::Pool; + /// use std::{thread, sync::Arc}; + /// + /// let pool: Arc<Pool<String>> = Arc::new(Pool::new()); + /// let key = pool.create_with(|item| item.push_str("hello world")).unwrap(); + /// + /// // Look up the created `Key`, returning an `OwnedRef`. + /// let value = pool.clone().get_owned(key).unwrap(); + /// + /// thread::spawn(move || { + /// assert_eq!(value, String::from("hello world")); + /// // ... + /// }).join().unwrap(); + /// ``` + /// + /// [`get`]: Pool::get + /// [`OwnedRef`]: crate::pool::OwnedRef + /// [`Ref`]: crate::pool::Ref + pub fn get_owned(self: Arc<Self>, key: usize) -> Option<OwnedRef<T, C>> { + let tid = C::unpack_tid(key); + + test_println!("pool: get{:?}; current={:?}", tid, Tid::<C>::current()); + let shard = self.shards.get(tid.as_usize())?; + let inner = shard.with_slot(key, |slot| slot.get(C::unpack_gen(key)))?; + Some(OwnedRef { + inner, + pool: self.clone(), + key, + }) + } + + /// Remove the value using the storage associated with the given key from the pool, returning + /// `true` if the value was removed. + /// + /// This method does _not_ block the current thread until the value can be + /// cleared. Instead, if another thread is currently accessing that value, this marks it to be + /// cleared by that thread when it is done accessing that value. + /// + /// # Examples + /// + /// ```rust + /// # use sharded_slab::Pool; + /// let pool: Pool<String> = Pool::new(); + /// + /// // Check out an item from the pool. + /// let mut item = pool.create().unwrap(); + /// let key = item.key(); + /// item.push_str("hello world"); + /// drop(item); + /// + /// assert_eq!(pool.get(key).unwrap(), String::from("hello world")); + /// + /// pool.clear(key); + /// assert!(pool.get(key).is_none()); + /// ``` + /// + /// ``` + /// # use sharded_slab::Pool; + /// let pool: Pool<String> = Pool::new(); + /// + /// let key = pool.create_with(|item| item.push_str("Hello world!")).unwrap(); + /// + /// // Clearing a key that doesn't exist in the `Pool` will return `false` + /// assert_eq!(pool.clear(key + 69420), false); + /// + /// // Clearing a key that does exist returns `true` + /// assert!(pool.clear(key)); + /// + /// // Clearing a key that has previously been cleared will return `false` + /// assert_eq!(pool.clear(key), false); + /// ``` + /// [`clear`]: #method.clear + pub fn clear(&self, key: usize) -> bool { + let tid = C::unpack_tid(key); + + let shard = self.shards.get(tid.as_usize()); + if tid.is_current() { + shard + .map(|shard| shard.mark_clear_local(key)) + .unwrap_or(false) + } else { + shard + .map(|shard| shard.mark_clear_remote(key)) + .unwrap_or(false) + } + } +} + +unsafe impl<T, C> Send for Pool<T, C> +where + T: Send + Clear + Default, + C: cfg::Config, +{ +} +unsafe impl<T, C> Sync for Pool<T, C> +where + T: Sync + Clear + Default, + C: cfg::Config, +{ +} + +impl<T> Default for Pool<T> +where + T: Clear + Default, +{ + fn default() -> Self { + Self::new() + } +} + +impl<T, C> fmt::Debug for Pool<T, C> +where + T: fmt::Debug + Clear + Default, + C: cfg::Config, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Pool") + .field("shards", &self.shards) + .field("config", &C::debug()) + .finish() + } +} + +// === impl Ref === + +impl<'a, T, C> Ref<'a, T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + /// Returns the key used to access this guard + pub fn key(&self) -> usize { + self.key + } + + #[inline] + fn value(&self) -> &T { + unsafe { + // Safety: calling `slot::Guard::value` is unsafe, since the `Guard` + // value contains a pointer to the slot that may outlive the slab + // containing that slot. Here, the `Ref` has a borrowed reference to + // the shard containing that slot, which ensures that the slot will + // not be dropped while this `Guard` exists. + self.inner.value() + } + } +} + +impl<'a, T, C> std::ops::Deref for Ref<'a, T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value() + } +} + +impl<'a, T, C> Drop for Ref<'a, T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + fn drop(&mut self) { + test_println!("drop Ref: try clearing data"); + let should_clear = unsafe { + // Safety: calling `slot::Guard::release` is unsafe, since the + // `Guard` value contains a pointer to the slot that may outlive the + // slab containing that slot. Here, the `Ref` guard owns a + // borrowed reference to the shard containing that slot, which + // ensures that the slot will not be dropped while this `Ref` + // exists. + self.inner.release() + }; + if should_clear { + self.shard.clear_after_release(self.key); + } + } +} + +impl<'a, T, C> fmt::Debug for Ref<'a, T, C> +where + T: fmt::Debug + Clear + Default, + C: cfg::Config, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self.value(), f) + } +} + +impl<'a, T, C> PartialEq<T> for Ref<'a, T, C> +where + T: PartialEq<T> + Clear + Default, + C: cfg::Config, +{ + fn eq(&self, other: &T) -> bool { + *self.value() == *other + } +} + +// === impl GuardMut === + +impl<'a, T, C: cfg::Config> RefMut<'a, T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + /// Returns the key used to access the guard. + pub fn key(&self) -> usize { + self.key + } + + /// Downgrades the mutable guard to an immutable guard, allowing access to + /// the pooled value from other threads. + /// + /// ## Examples + /// + /// ``` + /// # use sharded_slab::Pool; + /// # use std::{sync::Arc, thread}; + /// let pool = Arc::new(Pool::<String>::new()); + /// + /// let mut guard_mut = pool.clone().create_owned().unwrap(); + /// let key = guard_mut.key(); + /// guard_mut.push_str("Hello"); + /// + /// // The pooled string is currently borrowed mutably, so other threads + /// // may not access it. + /// let pool2 = pool.clone(); + /// thread::spawn(move || { + /// assert!(pool2.get(key).is_none()) + /// }).join().unwrap(); + /// + /// // Downgrade the guard to an immutable reference. + /// let guard = guard_mut.downgrade(); + /// + /// // Now, other threads may also access the pooled value. + /// let pool2 = pool.clone(); + /// thread::spawn(move || { + /// let guard = pool2.get(key) + /// .expect("the item may now be referenced by other threads"); + /// assert_eq!(guard, String::from("Hello")); + /// }).join().unwrap(); + /// + /// // We can still access the value immutably through the downgraded guard. + /// assert_eq!(guard, String::from("Hello")); + /// ``` + pub fn downgrade(mut self) -> Ref<'a, T, C> { + let inner = unsafe { self.inner.downgrade() }; + Ref { + inner, + shard: self.shard, + key: self.key, + } + } + + #[inline] + fn value(&self) -> &T { + unsafe { + // Safety: we are holding a reference to the shard which keeps the + // pointed slot alive. The returned reference will not outlive + // `self`. + self.inner.value() + } + } +} + +impl<'a, T, C: cfg::Config> std::ops::Deref for RefMut<'a, T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value() + } +} + +impl<'a, T, C> std::ops::DerefMut for RefMut<'a, T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { + // Safety: we are holding a reference to the shard which keeps the + // pointed slot alive. The returned reference will not outlive `self`. + self.inner.value_mut() + } + } +} + +impl<'a, T, C> Drop for RefMut<'a, T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + fn drop(&mut self) { + test_println!(" -> drop RefMut: try clearing data"); + let should_clear = unsafe { + // Safety: we are holding a reference to the shard which keeps the + // pointed slot alive. The returned reference will not outlive `self`. + self.inner.release() + }; + if should_clear { + self.shard.clear_after_release(self.key); + } + } +} + +impl<'a, T, C> fmt::Debug for RefMut<'a, T, C> +where + T: fmt::Debug + Clear + Default, + C: cfg::Config, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self.value(), f) + } +} + +impl<'a, T, C> PartialEq<T> for RefMut<'a, T, C> +where + T: PartialEq<T> + Clear + Default, + C: cfg::Config, +{ + fn eq(&self, other: &T) -> bool { + self.value().eq(other) + } +} + +// === impl OwnedRef === + +impl<T, C> OwnedRef<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + /// Returns the key used to access this guard + pub fn key(&self) -> usize { + self.key + } + + #[inline] + fn value(&self) -> &T { + unsafe { + // Safety: calling `slot::Guard::value` is unsafe, since the `Guard` + // value contains a pointer to the slot that may outlive the slab + // containing that slot. Here, the `Ref` has a borrowed reference to + // the shard containing that slot, which ensures that the slot will + // not be dropped while this `Guard` exists. + self.inner.value() + } + } +} + +impl<T, C> std::ops::Deref for OwnedRef<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value() + } +} + +impl<T, C> Drop for OwnedRef<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + fn drop(&mut self) { + test_println!("drop OwnedRef: try clearing data"); + let should_clear = unsafe { + // Safety: calling `slot::Guard::release` is unsafe, since the + // `Guard` value contains a pointer to the slot that may outlive the + // slab containing that slot. Here, the `OwnedRef` owns an `Arc` + // clone of the pool, which keeps it alive as long as the `OwnedRef` + // exists. + self.inner.release() + }; + if should_clear { + let shard_idx = Tid::<C>::from_packed(self.key); + test_println!("-> shard={:?}", shard_idx); + if let Some(shard) = self.pool.shards.get(shard_idx.as_usize()) { + shard.clear_after_release(self.key); + } else { + test_println!("-> shard={:?} does not exist! THIS IS A BUG", shard_idx); + debug_assert!(std::thread::panicking(), "[internal error] tried to drop an `OwnedRef` to a slot on a shard that never existed!"); + } + } + } +} + +impl<T, C> fmt::Debug for OwnedRef<T, C> +where + T: fmt::Debug + Clear + Default, + C: cfg::Config, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self.value(), f) + } +} + +impl<T, C> PartialEq<T> for OwnedRef<T, C> +where + T: PartialEq<T> + Clear + Default, + C: cfg::Config, +{ + fn eq(&self, other: &T) -> bool { + *self.value() == *other + } +} + +unsafe impl<T, C> Sync for OwnedRef<T, C> +where + T: Sync + Clear + Default, + C: cfg::Config, +{ +} + +unsafe impl<T, C> Send for OwnedRef<T, C> +where + T: Sync + Clear + Default, + C: cfg::Config, +{ +} + +// === impl OwnedRefMut === + +impl<T, C> OwnedRefMut<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + /// Returns the key used to access this guard + pub fn key(&self) -> usize { + self.key + } + + /// Downgrades the owned mutable guard to an owned immutable guard, allowing + /// access to the pooled value from other threads. + /// + /// ## Examples + /// + /// ``` + /// # use sharded_slab::Pool; + /// # use std::{sync::Arc, thread}; + /// let pool = Arc::new(Pool::<String>::new()); + /// + /// let mut guard_mut = pool.clone().create_owned().unwrap(); + /// let key = guard_mut.key(); + /// guard_mut.push_str("Hello"); + /// + /// // The pooled string is currently borrowed mutably, so other threads + /// // may not access it. + /// let pool2 = pool.clone(); + /// thread::spawn(move || { + /// assert!(pool2.get(key).is_none()) + /// }).join().unwrap(); + /// + /// // Downgrade the guard to an immutable reference. + /// let guard = guard_mut.downgrade(); + /// + /// // Now, other threads may also access the pooled value. + /// let pool2 = pool.clone(); + /// thread::spawn(move || { + /// let guard = pool2.get(key) + /// .expect("the item may now be referenced by other threads"); + /// assert_eq!(guard, String::from("Hello")); + /// }).join().unwrap(); + /// + /// // We can still access the value immutably through the downgraded guard. + /// assert_eq!(guard, String::from("Hello")); + /// ``` + pub fn downgrade(mut self) -> OwnedRef<T, C> { + let inner = unsafe { self.inner.downgrade() }; + OwnedRef { + inner, + pool: self.pool.clone(), + key: self.key, + } + } + + fn shard(&self) -> Option<&Shard<T, C>> { + let shard_idx = Tid::<C>::from_packed(self.key); + test_println!("-> shard={:?}", shard_idx); + self.pool.shards.get(shard_idx.as_usize()) + } + + #[inline] + fn value(&self) -> &T { + unsafe { + // Safety: calling `slot::InitGuard::value` is unsafe, since the `Guard` + // value contains a pointer to the slot that may outlive the slab + // containing that slot. Here, the `OwnedRefMut` has an `Arc` clone of + // the shard containing that slot, which ensures that the slot will + // not be dropped while this `Guard` exists. + self.inner.value() + } + } +} + +impl<T, C> std::ops::Deref for OwnedRefMut<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value() + } +} + +impl<T, C> std::ops::DerefMut for OwnedRefMut<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { + // Safety: calling `slot::InitGuard::value_mut` is unsafe, since the + // `Guard` value contains a pointer to the slot that may outlive + // the slab containing that slot. Here, the `OwnedRefMut` has an + // `Arc` clone of the shard containing that slot, which ensures that + // the slot will not be dropped while this `Guard` exists. + self.inner.value_mut() + } + } +} + +impl<T, C> Drop for OwnedRefMut<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + fn drop(&mut self) { + test_println!("drop OwnedRefMut: try clearing data"); + let should_clear = unsafe { + // Safety: calling `slot::Guard::release` is unsafe, since the + // `Guard` value contains a pointer to the slot that may outlive the + // slab containing that slot. Here, the `OwnedRefMut` owns an `Arc` + // clone of the pool, which keeps it alive as long as the + // `OwnedRefMut` exists. + self.inner.release() + }; + if should_clear { + if let Some(shard) = self.shard() { + shard.clear_after_release(self.key); + } else { + test_println!("-> shard does not exist! THIS IS A BUG"); + debug_assert!(std::thread::panicking(), "[internal error] tried to drop an `OwnedRefMut` to a slot on a shard that never existed!"); + } + } + } +} + +impl<T, C> fmt::Debug for OwnedRefMut<T, C> +where + T: fmt::Debug + Clear + Default, + C: cfg::Config, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self.value(), f) + } +} + +impl<T, C> PartialEq<T> for OwnedRefMut<T, C> +where + T: PartialEq<T> + Clear + Default, + C: cfg::Config, +{ + fn eq(&self, other: &T) -> bool { + *self.value() == *other + } +} + +unsafe impl<T, C> Sync for OwnedRefMut<T, C> +where + T: Sync + Clear + Default, + C: cfg::Config, +{ +} + +unsafe impl<T, C> Send for OwnedRefMut<T, C> +where + T: Sync + Clear + Default, + C: cfg::Config, +{ +} diff --git a/vendor/sharded-slab/src/shard.rs b/vendor/sharded-slab/src/shard.rs new file mode 100644 index 000000000..0d054d7e5 --- /dev/null +++ b/vendor/sharded-slab/src/shard.rs @@ -0,0 +1,432 @@ +use crate::{ + cfg::{self, CfgPrivate}, + clear::Clear, + page, + sync::{ + alloc, + atomic::{ + AtomicPtr, AtomicUsize, + Ordering::{self, *}, + }, + }, + tid::Tid, + Pack, +}; + +use std::{fmt, ptr, slice}; + +// ┌─────────────┐ ┌────────┐ +// │ page 1 │ │ │ +// ├─────────────┤ ┌───▶│ next──┼─┐ +// │ page 2 │ │ ├────────┤ │ +// │ │ │ │XXXXXXXX│ │ +// │ local_free──┼─┘ ├────────┤ │ +// │ global_free─┼─┐ │ │◀┘ +// ├─────────────┤ └───▶│ next──┼─┐ +// │ page 3 │ ├────────┤ │ +// └─────────────┘ │XXXXXXXX│ │ +// ... ├────────┤ │ +// ┌─────────────┐ │XXXXXXXX│ │ +// │ page n │ ├────────┤ │ +// └─────────────┘ │ │◀┘ +// │ next──┼───▶ +// ├────────┤ +// │XXXXXXXX│ +// └────────┘ +// ... +pub(crate) struct Shard<T, C: cfg::Config> { + /// The shard's parent thread ID. + pub(crate) tid: usize, + /// The local free list for each page. + /// + /// These are only ever accessed from this shard's thread, so they are + /// stored separately from the shared state for the page that can be + /// accessed concurrently, to minimize false sharing. + local: Box<[page::Local]>, + /// The shared state for each page in this shard. + /// + /// This consists of the page's metadata (size, previous size), remote free + /// list, and a pointer to the actual array backing that page. + shared: Box<[page::Shared<T, C>]>, +} + +pub(crate) struct Array<T, C: cfg::Config> { + shards: Box<[Ptr<T, C>]>, + max: AtomicUsize, +} + +#[derive(Debug)] +struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>); + +#[derive(Debug)] +pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>); + +// === impl Shard === + +impl<T, C> Shard<T, C> +where + C: cfg::Config, +{ + #[inline(always)] + pub(crate) fn with_slot<'a, U>( + &'a self, + idx: usize, + f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>, + ) -> Option<U> { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + test_println!("-> {:?}", addr); + if page_index > self.shared.len() { + return None; + } + + self.shared[page_index].with_slot(addr, f) + } + + pub(crate) fn new(tid: usize) -> Self { + let mut total_sz = 0; + let shared = (0..C::MAX_PAGES) + .map(|page_num| { + let sz = C::page_size(page_num); + let prev_sz = total_sz; + total_sz += sz; + page::Shared::new(sz, prev_sz) + }) + .collect(); + let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect(); + Self { tid, local, shared } + } +} + +impl<T, C> Shard<Option<T>, C> +where + C: cfg::Config, +{ + /// Remove an item on the shard's local thread. + pub(crate) fn take_local(&self, idx: usize) -> Option<T> { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + test_println!("-> remove_local {:?}", addr); + + self.shared + .get(page_index)? + .take(addr, C::unpack_gen(idx), self.local(page_index)) + } + + /// Remove an item, while on a different thread from the shard's local thread. + pub(crate) fn take_remote(&self, idx: usize) -> Option<T> { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + debug_assert!(Tid::<C>::current().as_usize() != self.tid); + + let (addr, page_index) = page::indices::<C>(idx); + + test_println!("-> take_remote {:?}; page {:?}", addr, page_index); + + let shared = self.shared.get(page_index)?; + shared.take(addr, C::unpack_gen(idx), shared.free_list()) + } + + pub(crate) fn remove_local(&self, idx: usize) -> bool { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + if page_index > self.shared.len() { + return false; + } + + self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index)) + } + + pub(crate) fn remove_remote(&self, idx: usize) -> bool { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + if page_index > self.shared.len() { + return false; + } + + let shared = &self.shared[page_index]; + shared.remove(addr, C::unpack_gen(idx), shared.free_list()) + } + + pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> { + self.shared.iter() + } +} + +impl<T, C> Shard<T, C> +where + T: Clear + Default, + C: cfg::Config, +{ + pub(crate) fn init_with<U>( + &self, + mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>, + ) -> Option<U> { + // Can we fit the value into an exist`ing page? + for (page_idx, page) in self.shared.iter().enumerate() { + let local = self.local(page_idx); + + test_println!("-> page {}; {:?}; {:?}", page_idx, local, page); + + if let Some(res) = page.init_with(local, &mut init) { + return Some(res); + } + } + + None + } + + pub(crate) fn mark_clear_local(&self, idx: usize) -> bool { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + if page_index > self.shared.len() { + return false; + } + + self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index)) + } + + pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + if page_index > self.shared.len() { + return false; + } + + let shared = &self.shared[page_index]; + shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list()) + } + + pub(crate) fn clear_after_release(&self, idx: usize) { + crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire); + let tid = Tid::<C>::current().as_usize(); + test_println!( + "-> clear_after_release; self.tid={:?}; current.tid={:?};", + tid, + self.tid + ); + if tid == self.tid { + self.clear_local(idx); + } else { + self.clear_remote(idx); + } + } + + fn clear_local(&self, idx: usize) -> bool { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + if page_index > self.shared.len() { + return false; + } + + self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index)) + } + + fn clear_remote(&self, idx: usize) -> bool { + debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid); + let (addr, page_index) = page::indices::<C>(idx); + + if page_index > self.shared.len() { + return false; + } + + let shared = &self.shared[page_index]; + shared.clear(addr, C::unpack_gen(idx), shared.free_list()) + } + + #[inline(always)] + fn local(&self, i: usize) -> &page::Local { + #[cfg(debug_assertions)] + debug_assert_eq_in_drop!( + Tid::<C>::current().as_usize(), + self.tid, + "tried to access local data from another thread!" + ); + + &self.local[i] + } +} + +impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut d = f.debug_struct("Shard"); + + #[cfg(debug_assertions)] + d.field("tid", &self.tid); + d.field("shared", &self.shared).finish() + } +} + +// === impl Array === + +impl<T, C> Array<T, C> +where + C: cfg::Config, +{ + pub(crate) fn new() -> Self { + let mut shards = Vec::with_capacity(C::MAX_SHARDS); + for _ in 0..C::MAX_SHARDS { + // XXX(eliza): T_T this could be avoided with maybeuninit or something... + shards.push(Ptr::null()); + } + Self { + shards: shards.into(), + max: AtomicUsize::new(0), + } + } + + #[inline] + pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> { + test_println!("-> get shard={}", idx); + self.shards.get(idx)?.load(Acquire) + } + + #[inline] + pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) { + let tid = Tid::<C>::current(); + test_println!("current: {:?}", tid); + let idx = tid.as_usize(); + assert!( + idx < self.shards.len(), + "Thread count overflowed the configured max count. \ + Thread index = {}, max threads = {}.", + idx, + C::MAX_SHARDS, + ); + // It's okay for this to be relaxed. The value is only ever stored by + // the thread that corresponds to the index, and we are that thread. + let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| { + let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx)))); + test_println!("-> allocated new shard for index {} at {:p}", idx, ptr); + self.shards[idx].set(ptr); + let mut max = self.max.load(Acquire); + while max < idx { + match self.max.compare_exchange(max, idx, AcqRel, Acquire) { + Ok(_) => break, + Err(actual) => max = actual, + } + } + test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max); + unsafe { + // Safety: we just put it there! + &*ptr + } + .get_ref() + }); + (tid, shard) + } + + pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> { + test_println!("Array::iter_mut"); + let max = self.max.load(Acquire); + test_println!("-> highest index={}", max); + IterMut(self.shards[0..=max].iter_mut()) + } +} + +impl<T, C: cfg::Config> Drop for Array<T, C> { + fn drop(&mut self) { + // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... + let max = self.max.load(Acquire); + for shard in &self.shards[0..=max] { + // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... + let ptr = shard.0.load(Acquire); + if ptr.is_null() { + continue; + } + let shard = unsafe { + // Safety: this is the only place where these boxes are + // deallocated, and we have exclusive access to the shard array, + // because...we are dropping it... + Box::from_raw(ptr) + }; + drop(shard) + } + } +} + +impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let max = self.max.load(Acquire); + let mut set = f.debug_map(); + for shard in &self.shards[0..=max] { + let ptr = shard.0.load(Acquire); + if let Some(shard) = ptr::NonNull::new(ptr) { + set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() }); + } else { + set.entry(&format_args!("{:p}", ptr), &()); + } + } + set.finish() + } +} + +// === impl Ptr === + +impl<T, C: cfg::Config> Ptr<T, C> { + #[inline] + fn null() -> Self { + Self(AtomicPtr::new(ptr::null_mut())) + } + + #[inline] + fn load(&self, order: Ordering) -> Option<&Shard<T, C>> { + let ptr = self.0.load(order); + test_println!("---> loaded={:p} (order={:?})", ptr, order); + if ptr.is_null() { + test_println!("---> null"); + return None; + } + let track = unsafe { + // Safety: The returned reference will have the same lifetime as the + // reference to the shard pointer, which (morally, if not actually) + // owns the shard. The shard is only deallocated when the shard + // array is dropped, and it won't be dropped while this pointer is + // borrowed --- and the returned reference has the same lifetime. + // + // We know that the pointer is not null, because we just + // null-checked it immediately prior. + &*ptr + }; + + Some(track.get_ref()) + } + + #[inline] + fn set(&self, new: *mut alloc::Track<Shard<T, C>>) { + self.0 + .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire) + .expect("a shard can only be inserted by the thread that owns it, this is a bug!"); + } +} + +// === Iterators === + +impl<'a, T, C> Iterator for IterMut<'a, T, C> +where + T: 'a, + C: cfg::Config + 'a, +{ + type Item = &'a Shard<T, C>; + fn next(&mut self) -> Option<Self::Item> { + test_println!("IterMut::next"); + loop { + // Skip over empty indices if they are less than the highest + // allocated shard. Some threads may have accessed the slab + // (generating a thread ID) but never actually inserted data, so + // they may have never allocated a shard. + let next = self.0.next(); + test_println!("-> next.is_some={}", next.is_some()); + if let Some(shard) = next?.load(Acquire) { + test_println!("-> done"); + return Some(shard); + } + } + } +} diff --git a/vendor/sharded-slab/src/sync.rs b/vendor/sharded-slab/src/sync.rs new file mode 100644 index 000000000..64a31dcd9 --- /dev/null +++ b/vendor/sharded-slab/src/sync.rs @@ -0,0 +1,140 @@ +pub(crate) use self::inner::*; + +#[cfg(all(loom, any(test, feature = "loom")))] +mod inner { + pub(crate) mod atomic { + pub use loom::sync::atomic::*; + pub use std::sync::atomic::Ordering; + } + pub(crate) use loom::{ + cell::UnsafeCell, hint, lazy_static, sync::Mutex, thread::yield_now, thread_local, + }; + + pub(crate) mod alloc { + #![allow(dead_code)] + use loom::alloc; + use std::fmt; + /// Track allocations, detecting leaks + /// + /// This is a version of `loom::alloc::Track` that adds a missing + /// `Default` impl. + pub struct Track<T>(alloc::Track<T>); + + impl<T> Track<T> { + /// Track a value for leaks + #[inline(always)] + pub fn new(value: T) -> Track<T> { + Track(alloc::Track::new(value)) + } + + /// Get a reference to the value + #[inline(always)] + pub fn get_ref(&self) -> &T { + self.0.get_ref() + } + + /// Get a mutable reference to the value + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } + + /// Stop tracking the value for leaks + #[inline(always)] + pub fn into_inner(self) -> T { + self.0.into_inner() + } + } + + impl<T: fmt::Debug> fmt::Debug for Track<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } + } + + impl<T: Default> Default for Track<T> { + fn default() -> Self { + Self::new(T::default()) + } + } + } +} + +#[cfg(not(all(loom, any(feature = "loom", test))))] +mod inner { + #![allow(dead_code)] + pub(crate) use lazy_static::lazy_static; + pub(crate) use std::{ + sync::{atomic, Mutex}, + thread::yield_now, + thread_local, + }; + + pub(crate) mod hint { + #[inline(always)] + pub(crate) fn spin_loop() { + // MSRV: std::hint::spin_loop() stabilized in 1.49.0 + #[allow(deprecated)] + super::atomic::spin_loop_hint() + } + } + + #[derive(Debug)] + pub(crate) struct UnsafeCell<T>(std::cell::UnsafeCell<T>); + + impl<T> UnsafeCell<T> { + pub fn new(data: T) -> UnsafeCell<T> { + UnsafeCell(std::cell::UnsafeCell::new(data)) + } + + #[inline(always)] + pub fn with<F, R>(&self, f: F) -> R + where + F: FnOnce(*const T) -> R, + { + f(self.0.get()) + } + + #[inline(always)] + pub fn with_mut<F, R>(&self, f: F) -> R + where + F: FnOnce(*mut T) -> R, + { + f(self.0.get()) + } + } + + pub(crate) mod alloc { + /// Track allocations, detecting leaks + #[derive(Debug, Default)] + pub struct Track<T> { + value: T, + } + + impl<T> Track<T> { + /// Track a value for leaks + #[inline(always)] + pub fn new(value: T) -> Track<T> { + Track { value } + } + + /// Get a reference to the value + #[inline(always)] + pub fn get_ref(&self) -> &T { + &self.value + } + + /// Get a mutable reference to the value + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + &mut self.value + } + + /// Stop tracking the value for leaks + #[inline(always)] + pub fn into_inner(self) -> T { + self.value + } + } + } +} diff --git a/vendor/sharded-slab/src/tests/loom_pool.rs b/vendor/sharded-slab/src/tests/loom_pool.rs new file mode 100644 index 000000000..d7df50552 --- /dev/null +++ b/vendor/sharded-slab/src/tests/loom_pool.rs @@ -0,0 +1,641 @@ +use super::util::*; +use crate::{clear::Clear, sync::alloc, Pack, Pool}; +use loom::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Condvar, Mutex, + }, + thread, +}; +use std::sync::Arc; + +#[derive(Default, Debug)] +struct State { + is_dropped: AtomicBool, + is_cleared: AtomicBool, + id: usize, +} + +impl State { + fn assert_clear(&self) { + assert!(!self.is_dropped.load(Ordering::SeqCst)); + assert!(self.is_cleared.load(Ordering::SeqCst)); + } + + fn assert_not_clear(&self) { + assert!(!self.is_dropped.load(Ordering::SeqCst)); + assert!(!self.is_cleared.load(Ordering::SeqCst)); + } +} + +impl PartialEq for State { + fn eq(&self, other: &State) -> bool { + self.id.eq(&other.id) + } +} + +#[derive(Default, Debug)] +struct DontDropMe(Arc<State>); + +impl PartialEq for DontDropMe { + fn eq(&self, other: &DontDropMe) -> bool { + self.0.eq(&other.0) + } +} + +impl DontDropMe { + fn new(id: usize) -> (Arc<State>, Self) { + let state = Arc::new(State { + is_dropped: AtomicBool::new(false), + is_cleared: AtomicBool::new(false), + id, + }); + (state.clone(), Self(state)) + } +} + +impl Drop for DontDropMe { + fn drop(&mut self) { + test_println!("-> DontDropMe drop: dropping data {:?}", self.0.id); + self.0.is_dropped.store(true, Ordering::SeqCst) + } +} + +impl Clear for DontDropMe { + fn clear(&mut self) { + test_println!("-> DontDropMe clear: clearing data {:?}", self.0.id); + self.0.is_cleared.store(true, Ordering::SeqCst); + } +} + +#[test] +fn dont_drop() { + run_model("dont_drop", || { + let pool: Pool<DontDropMe> = Pool::new(); + let (item1, value) = DontDropMe::new(1); + test_println!("-> dont_drop: Inserting into pool {}", item1.id); + let idx = pool + .create_with(move |item| *item = value) + .expect("create_with"); + + item1.assert_not_clear(); + + test_println!("-> dont_drop: clearing idx: {}", idx); + pool.clear(idx); + + item1.assert_clear(); + }); +} + +#[test] +fn concurrent_create_with_clear() { + run_model("concurrent_create_with_clear", || { + let pool: Arc<Pool<DontDropMe>> = Arc::new(Pool::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let (item1, value) = DontDropMe::new(1); + let idx1 = pool + .create_with(move |item| *item = value) + .expect("create_with"); + let p = pool.clone(); + let pair2 = pair.clone(); + let test_value = item1.clone(); + let t1 = thread::spawn(move || { + let (lock, cvar) = &*pair2; + test_println!("-> making get request"); + assert_eq!(p.get(idx1).unwrap().0.id, test_value.id); + let mut next = lock.lock().unwrap(); + *next = Some(()); + cvar.notify_one(); + }); + + test_println!("-> making get request"); + let guard = pool.get(idx1); + + let (lock, cvar) = &*pair; + let mut next = lock.lock().unwrap(); + // wait until we have a guard on the other thread. + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + // the item should be marked (clear returns true)... + assert!(pool.clear(idx1)); + // ...but the value shouldn't be removed yet. + item1.assert_not_clear(); + + t1.join().expect("thread 1 unable to join"); + + drop(guard); + item1.assert_clear(); + }) +} + +#[test] +fn racy_clear() { + run_model("racy_clear", || { + let pool = Arc::new(Pool::new()); + let (item, value) = DontDropMe::new(1); + + let idx = pool + .create_with(move |item| *item = value) + .expect("create_with"); + assert_eq!(pool.get(idx).unwrap().0.id, item.id); + + let p = pool.clone(); + let t2 = thread::spawn(move || p.clear(idx)); + let r1 = pool.clear(idx); + let r2 = t2.join().expect("thread 2 should not panic"); + + test_println!("r1: {}, r2: {}", r1, r2); + + assert!( + !(r1 && r2), + "Both threads should not have cleared the value" + ); + assert!(r1 || r2, "One thread should have removed the value"); + assert!(pool.get(idx).is_none()); + item.assert_clear(); + }) +} + +#[test] +fn clear_local_and_reuse() { + run_model("take_remote_and_reuse", || { + let pool = Arc::new(Pool::new_with_config::<TinyConfig>()); + + let idx1 = pool + .create_with(|item: &mut String| { + item.push_str("hello world"); + }) + .expect("create_with"); + let idx2 = pool + .create_with(|item| item.push_str("foo")) + .expect("create_with"); + let idx3 = pool + .create_with(|item| item.push_str("bar")) + .expect("create_with"); + + assert_eq!(pool.get(idx1).unwrap(), String::from("hello world")); + assert_eq!(pool.get(idx2).unwrap(), String::from("foo")); + assert_eq!(pool.get(idx3).unwrap(), String::from("bar")); + + let first = idx1 & (!crate::page::slot::Generation::<TinyConfig>::MASK); + assert!(pool.clear(idx1)); + + let idx1 = pool + .create_with(move |item| item.push_str("h")) + .expect("create_with"); + + let second = idx1 & (!crate::page::slot::Generation::<TinyConfig>::MASK); + assert_eq!(first, second); + assert!(pool.get(idx1).unwrap().capacity() >= 11); + }) +} + +#[test] +fn create_mut_guard_prevents_access() { + run_model("create_mut_guard_prevents_access", || { + let pool = Arc::new(Pool::<String>::new()); + let guard = pool.create().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + thread::spawn(move || { + assert!(pool2.get(key).is_none()); + }) + .join() + .unwrap(); + }); +} + +#[test] +fn create_mut_guard() { + run_model("create_mut_guard", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.create().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + + guard.push_str("Hello world"); + drop(guard); + + t1.join().unwrap(); + }); +} + +#[test] +fn create_mut_guard_2() { + run_model("create_mut_guard_2", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.create().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + let pool3 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + + guard.push_str("Hello world"); + let t2 = thread::spawn(move || { + test_dbg!(pool3.get(key)); + }); + drop(guard); + + t1.join().unwrap(); + t2.join().unwrap(); + }); +} + +#[test] +fn create_mut_guard_downgrade() { + run_model("create_mut_guard_downgrade", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.create().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + let pool3 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + + guard.push_str("Hello world"); + let guard = guard.downgrade(); + let t2 = thread::spawn(move || { + test_dbg!(pool3.get(key)); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + assert_eq!(guard, "Hello world".to_owned()); + }); +} + +#[test] +fn create_mut_guard_downgrade_clear() { + run_model("create_mut_guard_downgrade_clear", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.create().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + + guard.push_str("Hello world"); + let guard = guard.downgrade(); + let pool3 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + let t2 = thread::spawn(move || { + test_dbg!(pool3.clear(key)); + }); + + assert_eq!(guard, "Hello world".to_owned()); + drop(guard); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert!(pool.get(key).is_none()); + }); +} + +#[test] +fn create_mut_downgrade_during_clear() { + run_model("create_mut_downgrade_during_clear", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.create().unwrap(); + let key: usize = guard.key(); + guard.push_str("Hello world"); + + let pool2 = pool.clone(); + let guard = guard.downgrade(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.clear(key)); + }); + + t1.join().unwrap(); + + assert_eq!(guard, "Hello world".to_owned()); + drop(guard); + + assert!(pool.get(key).is_none()); + }); +} + +#[test] +fn ownedref_send_out_of_local() { + run_model("ownedref_send_out_of_local", || { + let pool = Arc::new(Pool::<alloc::Track<String>>::new()); + let key1 = pool + .create_with(|item| item.get_mut().push_str("hello")) + .expect("create item 1"); + let key2 = pool + .create_with(|item| item.get_mut().push_str("goodbye")) + .expect("create item 2"); + + let item1 = pool.clone().get_owned(key1).expect("get key1"); + let item2 = pool.clone().get_owned(key2).expect("get key2"); + let pool2 = pool.clone(); + + test_dbg!(pool.clear(key1)); + + let t1 = thread::spawn(move || { + assert_eq!(item1.get_ref(), &String::from("hello")); + drop(item1); + }); + let t2 = thread::spawn(move || { + assert_eq!(item2.get_ref(), &String::from("goodbye")); + test_dbg!(pool2.clear(key2)); + drop(item2); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert!(pool.get(key1).is_none()); + assert!(pool.get(key2).is_none()); + }); +} + +#[test] +fn ownedrefs_outlive_pool() { + run_model("ownedrefs_outlive_pool", || { + let pool = Arc::new(Pool::<alloc::Track<String>>::new()); + let key1 = pool + .create_with(|item| item.get_mut().push_str("hello")) + .expect("create item 1"); + let key2 = pool + .create_with(|item| item.get_mut().push_str("goodbye")) + .expect("create item 2"); + + let item1_1 = pool.clone().get_owned(key1).expect("get key1"); + let item1_2 = pool.clone().get_owned(key1).expect("get key1 again"); + let item2 = pool.clone().get_owned(key2).expect("get key2"); + drop(pool); + + let t1 = thread::spawn(move || { + assert_eq!(item1_1.get_ref(), &String::from("hello")); + drop(item1_1); + }); + + let t2 = thread::spawn(move || { + assert_eq!(item2.get_ref(), &String::from("goodbye")); + drop(item2); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert_eq!(item1_2.get_ref(), &String::from("hello")); + }); +} + +#[test] +fn ownedref_ping_pong() { + run_model("ownedref_ping_pong", || { + let pool = Arc::new(Pool::<alloc::Track<String>>::new()); + let key1 = pool + .create_with(|item| item.get_mut().push_str("hello")) + .expect("create item 1"); + let key2 = pool + .create_with(|item| item.get_mut().push_str("world")) + .expect("create item 2"); + + let item1 = pool.clone().get_owned(key1).expect("get key1"); + let pool2 = pool.clone(); + let pool3 = pool.clone(); + + let t1 = thread::spawn(move || { + assert_eq!(item1.get_ref(), &String::from("hello")); + pool2.clear(key1); + item1 + }); + + let t2 = thread::spawn(move || { + let item2 = pool3.clone().get_owned(key2).unwrap(); + assert_eq!(item2.get_ref(), &String::from("world")); + pool3.clear(key1); + item2 + }); + + let item1 = t1.join().unwrap(); + let item2 = t2.join().unwrap(); + + assert_eq!(item1.get_ref(), &String::from("hello")); + assert_eq!(item2.get_ref(), &String::from("world")); + }); +} + +#[test] +fn ownedref_drop_from_other_threads() { + run_model("ownedref_drop_from_other_threads", || { + let pool = Arc::new(Pool::<alloc::Track<String>>::new()); + let key1 = pool + .create_with(|item| item.get_mut().push_str("hello")) + .expect("create item 1"); + let item1 = pool.clone().get_owned(key1).expect("get key1"); + + let pool2 = pool.clone(); + + let t1 = thread::spawn(move || { + let pool = pool2.clone(); + let key2 = pool + .create_with(|item| item.get_mut().push_str("goodbye")) + .expect("create item 1"); + let item2 = pool.clone().get_owned(key2).expect("get key1"); + let t2 = thread::spawn(move || { + assert_eq!(item2.get_ref(), &String::from("goodbye")); + test_dbg!(pool2.clear(key1)); + drop(item2) + }); + assert_eq!(item1.get_ref(), &String::from("hello")); + test_dbg!(pool.clear(key2)); + drop(item1); + (t2, key2) + }); + + let (t2, key2) = t1.join().unwrap(); + test_dbg!(pool.get(key1)); + test_dbg!(pool.get(key2)); + + t2.join().unwrap(); + + assert!(pool.get(key1).is_none()); + assert!(pool.get(key2).is_none()); + }); +} + +#[test] +fn create_owned_mut_guard() { + run_model("create_owned_mut_guard", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.clone().create_owned().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + + guard.push_str("Hello world"); + drop(guard); + + t1.join().unwrap(); + }); +} + +#[test] +fn create_owned_mut_guard_send() { + run_model("create_owned_mut_guard", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.clone().create_owned().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + + let t2 = thread::spawn(move || { + guard.push_str("Hello world"); + drop(guard); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + }); +} + +#[test] +fn create_owned_mut_guard_2() { + run_model("create_owned_mut_guard_2", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.clone().create_owned().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + let pool3 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + + guard.push_str("Hello world"); + let t2 = thread::spawn(move || { + test_dbg!(pool3.get(key)); + }); + drop(guard); + + t1.join().unwrap(); + t2.join().unwrap(); + }); +} + +#[test] +fn create_owned_mut_guard_downgrade() { + run_model("create_owned_mut_guard_downgrade", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.clone().create_owned().unwrap(); + guard.push_str("Hello world"); + + let key: usize = guard.key(); + + let pool2 = pool.clone(); + let pool3 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + + let guard = guard.downgrade(); + let t2 = thread::spawn(move || { + assert_eq!(pool3.get(key).unwrap(), "Hello world".to_owned()); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + assert_eq!(guard, "Hello world".to_owned()); + }); +} + +#[test] +fn create_owned_mut_guard_downgrade_then_clear() { + run_model("create_owned_mut_guard_downgrade_then_clear", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.clone().create_owned().unwrap(); + let key: usize = guard.key(); + + let pool2 = pool.clone(); + + guard.push_str("Hello world"); + let guard = guard.downgrade(); + let pool3 = pool.clone(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.get(key)); + }); + let t2 = thread::spawn(move || { + test_dbg!(pool3.clear(key)); + }); + + assert_eq!(guard, "Hello world".to_owned()); + drop(guard); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert!(pool.get(key).is_none()); + }); +} + +#[test] +fn create_owned_mut_downgrade_during_clear() { + run_model("create_owned_mut_downgrade_during_clear", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.clone().create_owned().unwrap(); + let key: usize = guard.key(); + guard.push_str("Hello world"); + + let pool2 = pool.clone(); + let guard = guard.downgrade(); + let t1 = thread::spawn(move || { + test_dbg!(pool2.clear(key)); + }); + + t1.join().unwrap(); + + assert_eq!(guard, "Hello world".to_owned()); + drop(guard); + + assert!(pool.get(key).is_none()); + }); +} + +#[test] +fn create_mut_downgrade_during_clear_by_other_thead() { + run_model("create_mut_downgrade_during_clear_by_other_thread", || { + let pool = Arc::new(Pool::<String>::new()); + let mut guard = pool.clone().create_owned().unwrap(); + let key: usize = guard.key(); + guard.push_str("Hello world"); + + let pool2 = pool.clone(); + let t1 = thread::spawn(move || { + let guard = guard.downgrade(); + assert_eq!(guard, "Hello world".to_owned()); + drop(guard); + }); + + let t2 = thread::spawn(move || { + test_dbg!(pool2.clear(key)); + }); + + test_dbg!(pool.get(key)); + + t1.join().unwrap(); + t2.join().unwrap(); + }); +} diff --git a/vendor/sharded-slab/src/tests/loom_slab.rs b/vendor/sharded-slab/src/tests/loom_slab.rs new file mode 100644 index 000000000..58422f9a0 --- /dev/null +++ b/vendor/sharded-slab/src/tests/loom_slab.rs @@ -0,0 +1,760 @@ +use super::util::*; +use crate::sync::alloc; +use crate::Slab; +use loom::sync::{Condvar, Mutex}; +use loom::thread; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +#[test] +fn take_local() { + run_model("take_local", || { + let slab = Arc::new(Slab::new()); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + let idx = s.insert(1).expect("insert"); + assert_eq!(s.get(idx).unwrap(), 1); + assert_eq!(s.take(idx), Some(1)); + assert!(s.get(idx).is_none()); + let idx = s.insert(2).expect("insert"); + assert_eq!(s.get(idx).unwrap(), 2); + assert_eq!(s.take(idx), Some(2)); + assert!(s.get(idx).is_none()); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + let idx = s.insert(3).expect("insert"); + assert_eq!(s.get(idx).unwrap(), 3); + assert_eq!(s.take(idx), Some(3)); + assert!(s.get(idx).is_none()); + let idx = s.insert(4).expect("insert"); + assert_eq!(s.get(idx).unwrap(), 4); + assert_eq!(s.take(idx), Some(4)); + assert!(s.get(idx).is_none()); + }); + + let s = slab; + let idx1 = s.insert(5).expect("insert"); + assert_eq!(s.get(idx1).unwrap(), 5); + let idx2 = s.insert(6).expect("insert"); + assert_eq!(s.get(idx2).unwrap(), 6); + assert_eq!(s.take(idx1), Some(5)); + assert!(s.get(idx1).is_none()); + assert_eq!(s.get(idx2).unwrap(), 6); + assert_eq!(s.take(idx2), Some(6)); + assert!(s.get(idx2).is_none()); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + }); +} + +#[test] +fn take_remote() { + run_model("take_remote", || { + let slab = Arc::new(Slab::new()); + + let idx1 = slab.insert(1).expect("insert"); + assert_eq!(slab.get(idx1).unwrap(), 1); + let idx2 = slab.insert(2).expect("insert"); + assert_eq!(slab.get(idx2).unwrap(), 2); + + let idx3 = slab.insert(3).expect("insert"); + assert_eq!(slab.get(idx3).unwrap(), 3); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + assert_eq!(s.get(idx2).unwrap(), 2); + assert_eq!(s.take(idx2), Some(2)); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + assert_eq!(s.get(idx3).unwrap(), 3); + assert_eq!(s.take(idx3), Some(3)); + }); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + + assert_eq!(slab.get(idx1).unwrap(), 1); + assert!(slab.get(idx2).is_none()); + assert!(slab.get(idx3).is_none()); + }); +} + +#[test] +fn racy_take() { + run_model("racy_take", || { + let slab = Arc::new(Slab::new()); + + let idx = slab.insert(1).expect("insert"); + assert_eq!(slab.get(idx).unwrap(), 1); + + let s1 = slab.clone(); + let s2 = slab.clone(); + + let t1 = thread::spawn(move || s1.take(idx)); + let t2 = thread::spawn(move || s2.take(idx)); + + let r1 = t1.join().expect("thread 1 should not panic"); + let r2 = t2.join().expect("thread 2 should not panic"); + + assert!( + r1.is_none() || r2.is_none(), + "both threads should not have removed the value" + ); + assert_eq!( + r1.or(r2), + Some(1), + "one thread should have removed the value" + ); + assert!(slab.get(idx).is_none()); + }); +} + +#[test] +fn racy_take_local() { + run_model("racy_take_local", || { + let slab = Arc::new(Slab::new()); + + let idx = slab.insert(1).expect("insert"); + assert_eq!(slab.get(idx).unwrap(), 1); + + let s = slab.clone(); + let t2 = thread::spawn(move || s.take(idx)); + let r1 = slab.take(idx); + let r2 = t2.join().expect("thread 2 should not panic"); + + assert!( + r1.is_none() || r2.is_none(), + "both threads should not have removed the value" + ); + assert!( + r1.or(r2).is_some(), + "one thread should have removed the value" + ); + assert!(slab.get(idx).is_none()); + }); +} + +#[test] +fn concurrent_insert_take() { + run_model("concurrent_insert_remove", || { + let slab = Arc::new(Slab::new()); + let pair = Arc::new((Mutex::new(None), Condvar::new())); + + let slab2 = slab.clone(); + let pair2 = pair.clone(); + let remover = thread::spawn(move || { + let (lock, cvar) = &*pair2; + for i in 0..2 { + test_println!("--- remover i={} ---", i); + let mut next = lock.lock().unwrap(); + while next.is_none() { + next = cvar.wait(next).unwrap(); + } + let key = next.take().unwrap(); + assert_eq!(slab2.take(key), Some(i)); + cvar.notify_one(); + } + }); + + let (lock, cvar) = &*pair; + for i in 0..2 { + test_println!("--- inserter i={} ---", i); + let key = slab.insert(i).expect("insert"); + + let mut next = lock.lock().unwrap(); + *next = Some(key); + cvar.notify_one(); + + // Wait for the item to be removed. + while next.is_some() { + next = cvar.wait(next).unwrap(); + } + + assert!(slab.get(key).is_none()); + } + + remover.join().unwrap(); + }) +} + +#[test] +fn take_remote_and_reuse() { + run_model("take_remote_and_reuse", || { + let slab = Arc::new(Slab::new_with_config::<TinyConfig>()); + + let idx1 = slab.insert(1).expect("insert"); + let idx2 = slab.insert(2).expect("insert"); + let idx3 = slab.insert(3).expect("insert"); + let idx4 = slab.insert(4).expect("insert"); + + assert_eq!(slab.get(idx1).unwrap(), 1, "slab: {:#?}", slab); + assert_eq!(slab.get(idx2).unwrap(), 2, "slab: {:#?}", slab); + assert_eq!(slab.get(idx3).unwrap(), 3, "slab: {:#?}", slab); + assert_eq!(slab.get(idx4).unwrap(), 4, "slab: {:#?}", slab); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + assert_eq!(s.take(idx1), Some(1), "slab: {:#?}", s); + }); + + let idx1 = slab.insert(5).expect("insert"); + t1.join().expect("thread 1 should not panic"); + + assert_eq!(slab.get(idx1).unwrap(), 5, "slab: {:#?}", slab); + assert_eq!(slab.get(idx2).unwrap(), 2, "slab: {:#?}", slab); + assert_eq!(slab.get(idx3).unwrap(), 3, "slab: {:#?}", slab); + assert_eq!(slab.get(idx4).unwrap(), 4, "slab: {:#?}", slab); + }); +} + +fn store_when_free<C: crate::Config>(slab: &Arc<Slab<usize, C>>, t: usize) -> usize { + loop { + test_println!("try store {:?}", t); + if let Some(key) = slab.insert(t) { + test_println!("inserted at {:#x}", key); + return key; + } + test_println!("retrying; slab is full..."); + thread::yield_now(); + } +} + +struct TinierConfig; + +impl crate::Config for TinierConfig { + const INITIAL_PAGE_SIZE: usize = 2; + const MAX_PAGES: usize = 1; +} + +#[test] +fn concurrent_remove_remote_and_reuse() { + let mut model = loom::model::Builder::new(); + model.max_branches = 100000; + run_builder("concurrent_remove_remote_and_reuse", model, || { + let slab = Arc::new(Slab::new_with_config::<TinierConfig>()); + + let idx1 = slab.insert(1).unwrap(); + let idx2 = slab.insert(2).unwrap(); + + assert_eq!(slab.get(idx1).unwrap(), 1, "slab: {:#?}", slab); + assert_eq!(slab.get(idx2).unwrap(), 2, "slab: {:#?}", slab); + + let s = slab.clone(); + let s2 = slab.clone(); + + let t1 = thread::spawn(move || { + s.take(idx1).expect("must remove"); + }); + + let t2 = thread::spawn(move || { + s2.take(idx2).expect("must remove"); + }); + + let idx3 = store_when_free(&slab, 3); + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 1 should not panic"); + + assert!(slab.get(idx1).is_none(), "slab: {:#?}", slab); + assert!(slab.get(idx2).is_none(), "slab: {:#?}", slab); + assert_eq!(slab.get(idx3).unwrap(), 3, "slab: {:#?}", slab); + }); +} + +struct SetDropped { + val: usize, + dropped: std::sync::Arc<AtomicBool>, +} + +struct AssertDropped { + dropped: std::sync::Arc<AtomicBool>, +} + +impl AssertDropped { + fn new(val: usize) -> (Self, SetDropped) { + let dropped = std::sync::Arc::new(AtomicBool::new(false)); + let val = SetDropped { + val, + dropped: dropped.clone(), + }; + (Self { dropped }, val) + } + + fn assert_dropped(&self) { + assert!( + self.dropped.load(Ordering::SeqCst), + "value should have been dropped!" + ); + } +} + +impl Drop for SetDropped { + fn drop(&mut self) { + self.dropped.store(true, Ordering::SeqCst); + } +} + +#[test] +fn remove_local() { + run_model("remove_local", || { + let slab = Arc::new(Slab::new_with_config::<TinyConfig>()); + let slab2 = slab.clone(); + + let (dropped, item) = AssertDropped::new(1); + let idx = slab.insert(item).expect("insert"); + + let guard = slab.get(idx).unwrap(); + + assert!(slab.remove(idx)); + + let t1 = thread::spawn(move || { + let g = slab2.get(idx); + drop(g); + }); + + assert!(slab.get(idx).is_none()); + + t1.join().expect("thread 1 should not panic"); + + drop(guard); + assert!(slab.get(idx).is_none()); + dropped.assert_dropped(); + }) +} + +#[test] +fn remove_remote() { + run_model("remove_remote", || { + let slab = Arc::new(Slab::new_with_config::<TinyConfig>()); + let slab2 = slab.clone(); + + let (dropped, item) = AssertDropped::new(1); + let idx = slab.insert(item).expect("insert"); + + assert!(slab.remove(idx)); + let t1 = thread::spawn(move || { + let g = slab2.get(idx); + drop(g); + }); + + t1.join().expect("thread 1 should not panic"); + + assert!(slab.get(idx).is_none()); + dropped.assert_dropped(); + }); +} + +#[test] +fn remove_remote_during_insert() { + run_model("remove_remote_during_insert", || { + let slab = Arc::new(Slab::new_with_config::<TinyConfig>()); + let slab2 = slab.clone(); + + let (dropped, item) = AssertDropped::new(1); + let idx = slab.insert(item).expect("insert"); + + let t1 = thread::spawn(move || { + let g = slab2.get(idx); + assert_ne!(g.as_ref().map(|v| v.val), Some(2)); + drop(g); + }); + + let (_, item) = AssertDropped::new(2); + assert!(slab.remove(idx)); + let idx2 = slab.insert(item).expect("insert"); + + t1.join().expect("thread 1 should not panic"); + + assert!(slab.get(idx).is_none()); + assert!(slab.get(idx2).is_some()); + dropped.assert_dropped(); + }); +} + +#[test] +fn unique_iter() { + run_model("unique_iter", || { + let mut slab = std::sync::Arc::new(Slab::new()); + + let s = slab.clone(); + let t1 = thread::spawn(move || { + s.insert(1).expect("insert"); + s.insert(2).expect("insert"); + }); + + let s = slab.clone(); + let t2 = thread::spawn(move || { + s.insert(3).expect("insert"); + s.insert(4).expect("insert"); + }); + + t1.join().expect("thread 1 should not panic"); + t2.join().expect("thread 2 should not panic"); + + let slab = std::sync::Arc::get_mut(&mut slab).expect("other arcs should be dropped"); + let items: Vec<_> = slab.unique_iter().map(|&i| i).collect(); + assert!(items.contains(&1), "items: {:?}", items); + assert!(items.contains(&2), "items: {:?}", items); + assert!(items.contains(&3), "items: {:?}", items); + assert!(items.contains(&4), "items: {:?}", items); + }); +} + +#[test] +fn custom_page_sz() { + let mut model = loom::model::Builder::new(); + model.max_branches = 100000; + model.check(|| { + let slab = Slab::<usize>::new_with_config::<TinyConfig>(); + + for i in 0..1024usize { + test_println!("{}", i); + let k = slab.insert(i).expect("insert"); + let v = slab.get(k).expect("get"); + assert_eq!(v, i, "slab: {:#?}", slab); + } + }); +} + +#[test] +fn max_refs() { + struct LargeGenConfig; + + // Configure the slab with a very large number of bits for the generation + // counter. That way, there will be very few bits for the ref count left + // over, and this test won't have to malloc millions of references. + impl crate::cfg::Config for LargeGenConfig { + const INITIAL_PAGE_SIZE: usize = 2; + const MAX_THREADS: usize = 32; + const MAX_PAGES: usize = 2; + } + + let mut model = loom::model::Builder::new(); + model.max_branches = 100000; + model.check(|| { + let slab = Slab::new_with_config::<LargeGenConfig>(); + let key = slab.insert("hello world").unwrap(); + let max = crate::page::slot::RefCount::<LargeGenConfig>::MAX; + + // Create the maximum number of concurrent references to the entry. + let mut refs = (0..max) + .map(|_| slab.get(key).unwrap()) + // Store the refs in a vec so they don't get dropped immediately. + .collect::<Vec<_>>(); + + assert!(slab.get(key).is_none()); + + // After dropping a ref, we should now be able to access the slot again. + drop(refs.pop()); + let ref1 = slab.get(key); + assert!(ref1.is_some()); + + // Ref1 should max out the number of references again. + assert!(slab.get(key).is_none()); + }) +} + +mod free_list_reuse { + use super::*; + struct TinyConfig; + + impl crate::cfg::Config for TinyConfig { + const INITIAL_PAGE_SIZE: usize = 2; + } + + #[test] + fn local_remove() { + run_model("free_list_reuse::local_remove", || { + let slab = Slab::new_with_config::<TinyConfig>(); + + let t1 = slab.insert("hello").expect("insert"); + let t2 = slab.insert("world").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t1).1, + 0, + "1st slot should be on 0th page" + ); + assert_eq!( + crate::page::indices::<TinyConfig>(t2).1, + 0, + "2nd slot should be on 0th page" + ); + let t3 = slab.insert("earth").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t3).1, + 1, + "3rd slot should be on 1st page" + ); + + slab.remove(t2); + let t4 = slab.insert("universe").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t4).1, + 0, + "2nd slot should be reused (0th page)" + ); + + slab.remove(t1); + let _ = slab.insert("goodbye").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t4).1, + 0, + "1st slot should be reused (0th page)" + ); + }); + } + + #[test] + fn local_take() { + run_model("free_list_reuse::local_take", || { + let slab = Slab::new_with_config::<TinyConfig>(); + + let t1 = slab.insert("hello").expect("insert"); + let t2 = slab.insert("world").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t1).1, + 0, + "1st slot should be on 0th page" + ); + assert_eq!( + crate::page::indices::<TinyConfig>(t2).1, + 0, + "2nd slot should be on 0th page" + ); + let t3 = slab.insert("earth").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t3).1, + 1, + "3rd slot should be on 1st page" + ); + + assert_eq!(slab.take(t2), Some("world")); + let t4 = slab.insert("universe").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t4).1, + 0, + "2nd slot should be reused (0th page)" + ); + + assert_eq!(slab.take(t1), Some("hello")); + let _ = slab.insert("goodbye").expect("insert"); + assert_eq!( + crate::page::indices::<TinyConfig>(t4).1, + 0, + "1st slot should be reused (0th page)" + ); + }); + } +} + +#[test] +fn vacant_entry() { + run_model("vacant_entry", || { + let slab = Arc::new(Slab::new()); + let entry = slab.vacant_entry().unwrap(); + let key: usize = entry.key(); + + let slab2 = slab.clone(); + let t1 = thread::spawn(move || { + test_dbg!(slab2.get(key)); + }); + + entry.insert("hello world"); + t1.join().unwrap(); + + assert_eq!(slab.get(key).expect("get"), "hello world"); + }); +} + +#[test] +fn vacant_entry_2() { + run_model("vacant_entry_2", || { + let slab = Arc::new(Slab::new()); + let entry = slab.vacant_entry().unwrap(); + let key: usize = entry.key(); + + let slab2 = slab.clone(); + let slab3 = slab.clone(); + let t1 = thread::spawn(move || { + test_dbg!(slab2.get(key)); + }); + + entry.insert("hello world"); + let t2 = thread::spawn(move || { + test_dbg!(slab3.get(key)); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + assert_eq!(slab.get(key).expect("get"), "hello world"); + }); +} + +#[test] +fn vacant_entry_remove() { + run_model("vacant_entry_remove", || { + let slab = Arc::new(Slab::new()); + let entry = slab.vacant_entry().unwrap(); + let key: usize = entry.key(); + + let slab2 = slab.clone(); + let t1 = thread::spawn(move || { + assert!(!slab2.remove(key)); + }); + + t1.join().unwrap(); + + entry.insert("hello world"); + assert_eq!(slab.get(key).expect("get"), "hello world"); + }); +} + +#[test] +fn owned_entry_send_out_of_local() { + run_model("owned_entry_send_out_of_local", || { + let slab = Arc::new(Slab::<alloc::Track<String>>::new()); + let key1 = slab + .insert(alloc::Track::new(String::from("hello"))) + .expect("insert item 1"); + let key2 = slab + .insert(alloc::Track::new(String::from("goodbye"))) + .expect("insert item 2"); + + let item1 = slab.clone().get_owned(key1).expect("get key1"); + let item2 = slab.clone().get_owned(key2).expect("get key2"); + let slab2 = slab.clone(); + + test_dbg!(slab.remove(key1)); + + let t1 = thread::spawn(move || { + assert_eq!(item1.get_ref(), &String::from("hello")); + drop(item1); + }); + let t2 = thread::spawn(move || { + assert_eq!(item2.get_ref(), &String::from("goodbye")); + test_dbg!(slab2.remove(key2)); + drop(item2); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert!(slab.get(key1).is_none()); + assert!(slab.get(key2).is_none()); + }); +} + +#[test] +fn owned_entrys_outlive_slab() { + run_model("owned_entrys_outlive_slab", || { + let slab = Arc::new(Slab::<alloc::Track<String>>::new()); + let key1 = slab + .insert(alloc::Track::new(String::from("hello"))) + .expect("insert item 1"); + let key2 = slab + .insert(alloc::Track::new(String::from("goodbye"))) + .expect("insert item 2"); + + let item1_1 = slab.clone().get_owned(key1).expect("get key1"); + let item1_2 = slab.clone().get_owned(key1).expect("get key1 again"); + let item2 = slab.clone().get_owned(key2).expect("get key2"); + drop(slab); + + let t1 = thread::spawn(move || { + assert_eq!(item1_1.get_ref(), &String::from("hello")); + drop(item1_1); + }); + + let t2 = thread::spawn(move || { + assert_eq!(item2.get_ref(), &String::from("goodbye")); + drop(item2); + }); + + t1.join().unwrap(); + t2.join().unwrap(); + + assert_eq!(item1_2.get_ref(), &String::from("hello")); + }); +} + +#[test] +fn owned_entry_ping_pong() { + run_model("owned_entry_ping_pong", || { + let slab = Arc::new(Slab::<alloc::Track<String>>::new()); + let key1 = slab + .insert(alloc::Track::new(String::from("hello"))) + .expect("insert item 1"); + let key2 = slab + .insert(alloc::Track::new(String::from("world"))) + .expect("insert item 2"); + + let item1 = slab.clone().get_owned(key1).expect("get key1"); + let slab2 = slab.clone(); + let slab3 = slab.clone(); + + let t1 = thread::spawn(move || { + assert_eq!(item1.get_ref(), &String::from("hello")); + slab2.remove(key1); + item1 + }); + + let t2 = thread::spawn(move || { + let item2 = slab3.clone().get_owned(key2).unwrap(); + assert_eq!(item2.get_ref(), &String::from("world")); + slab3.remove(key1); + item2 + }); + + let item1 = t1.join().unwrap(); + let item2 = t2.join().unwrap(); + + assert_eq!(item1.get_ref(), &String::from("hello")); + assert_eq!(item2.get_ref(), &String::from("world")); + }); +} + +#[test] +fn owned_entry_drop_from_other_threads() { + run_model("owned_entry_drop_from_other_threads", || { + let slab = Arc::new(Slab::<alloc::Track<String>>::new()); + let key1 = slab + .insert(alloc::Track::new(String::from("hello"))) + .expect("insert item 1"); + let item1 = slab.clone().get_owned(key1).expect("get key1"); + + let slab2 = slab.clone(); + + let t1 = thread::spawn(move || { + let slab = slab2.clone(); + let key2 = slab + .insert(alloc::Track::new(String::from("goodbye"))) + .expect("insert item 1"); + let item2 = slab.clone().get_owned(key2).expect("get key1"); + let t2 = thread::spawn(move || { + assert_eq!(item2.get_ref(), &String::from("goodbye")); + test_dbg!(slab2.remove(key1)); + drop(item2) + }); + assert_eq!(item1.get_ref(), &String::from("hello")); + test_dbg!(slab.remove(key2)); + drop(item1); + (t2, key2) + }); + + let (t2, key2) = t1.join().unwrap(); + test_dbg!(slab.get(key1)); + test_dbg!(slab.get(key2)); + + t2.join().unwrap(); + + assert!(slab.get(key1).is_none()); + assert!(slab.get(key2).is_none()); + }); +} diff --git a/vendor/sharded-slab/src/tests/mod.rs b/vendor/sharded-slab/src/tests/mod.rs new file mode 100644 index 000000000..be153b573 --- /dev/null +++ b/vendor/sharded-slab/src/tests/mod.rs @@ -0,0 +1,71 @@ +mod idx { + use crate::{ + cfg, + page::{self, slot}, + Pack, Tid, + }; + use proptest::prelude::*; + + proptest! { + #[test] + #[cfg_attr(loom, ignore)] + fn tid_roundtrips(tid in 0usize..Tid::<cfg::DefaultConfig>::BITS) { + let tid = Tid::<cfg::DefaultConfig>::from_usize(tid); + let packed = tid.pack(0); + assert_eq!(tid, Tid::from_packed(packed)); + } + + #[test] + #[cfg_attr(loom, ignore)] + fn idx_roundtrips( + tid in 0usize..Tid::<cfg::DefaultConfig>::BITS, + gen in 0usize..slot::Generation::<cfg::DefaultConfig>::BITS, + addr in 0usize..page::Addr::<cfg::DefaultConfig>::BITS, + ) { + let tid = Tid::<cfg::DefaultConfig>::from_usize(tid); + let gen = slot::Generation::<cfg::DefaultConfig>::from_usize(gen); + let addr = page::Addr::<cfg::DefaultConfig>::from_usize(addr); + let packed = tid.pack(gen.pack(addr.pack(0))); + assert_eq!(addr, page::Addr::from_packed(packed)); + assert_eq!(gen, slot::Generation::from_packed(packed)); + assert_eq!(tid, Tid::from_packed(packed)); + } + } +} + +pub(crate) mod util { + #[cfg(loom)] + use std::sync::atomic::{AtomicUsize, Ordering}; + pub(crate) struct TinyConfig; + + impl crate::Config for TinyConfig { + const INITIAL_PAGE_SIZE: usize = 4; + } + + #[cfg(loom)] + pub(crate) fn run_model(name: &'static str, f: impl Fn() + Sync + Send + 'static) { + run_builder(name, loom::model::Builder::new(), f) + } + + #[cfg(loom)] + pub(crate) fn run_builder( + name: &'static str, + builder: loom::model::Builder, + f: impl Fn() + Sync + Send + 'static, + ) { + let iters = AtomicUsize::new(1); + builder.check(move || { + test_println!( + "\n------------ running test {}; iteration {} ------------\n", + name, + iters.fetch_add(1, Ordering::SeqCst) + ); + f() + }); + } +} + +#[cfg(loom)] +mod loom_pool; +#[cfg(loom)] +mod loom_slab; diff --git a/vendor/sharded-slab/src/tid.rs b/vendor/sharded-slab/src/tid.rs new file mode 100644 index 000000000..57d64f970 --- /dev/null +++ b/vendor/sharded-slab/src/tid.rs @@ -0,0 +1,194 @@ +use crate::{ + cfg::{self, CfgPrivate}, + page, + sync::{ + atomic::{AtomicUsize, Ordering}, + lazy_static, thread_local, Mutex, + }, + Pack, +}; +use std::{ + cell::{Cell, UnsafeCell}, + collections::VecDeque, + fmt, + marker::PhantomData, + sync::PoisonError, +}; + +/// Uniquely identifies a thread. +pub(crate) struct Tid<C> { + id: usize, + _not_send: PhantomData<UnsafeCell<()>>, + _cfg: PhantomData<fn(C)>, +} + +#[derive(Debug)] +struct Registration(Cell<Option<usize>>); + +struct Registry { + next: AtomicUsize, + free: Mutex<VecDeque<usize>>, +} + +lazy_static! { + static ref REGISTRY: Registry = Registry { + next: AtomicUsize::new(0), + free: Mutex::new(VecDeque::new()), + }; +} + +thread_local! { + static REGISTRATION: Registration = Registration::new(); +} + +// === impl Tid === + +impl<C: cfg::Config> Pack<C> for Tid<C> { + const LEN: usize = C::MAX_SHARDS.trailing_zeros() as usize + 1; + + type Prev = page::Addr<C>; + + #[inline(always)] + fn as_usize(&self) -> usize { + self.id + } + + #[inline(always)] + fn from_usize(id: usize) -> Self { + Self { + id, + _not_send: PhantomData, + _cfg: PhantomData, + } + } +} + +impl<C: cfg::Config> Tid<C> { + #[inline] + pub(crate) fn current() -> Self { + REGISTRATION + .try_with(Registration::current) + .unwrap_or_else(|_| Self::poisoned()) + } + + pub(crate) fn is_current(self) -> bool { + REGISTRATION + .try_with(|r| self == r.current::<C>()) + .unwrap_or(false) + } + + #[inline(always)] + pub fn new(id: usize) -> Self { + Self::from_usize(id) + } +} + +impl<C> Tid<C> { + #[cold] + fn poisoned() -> Self { + Self { + id: std::usize::MAX, + _not_send: PhantomData, + _cfg: PhantomData, + } + } + + /// Returns true if the local thread ID was accessed while unwinding. + pub(crate) fn is_poisoned(&self) -> bool { + self.id == std::usize::MAX + } +} + +impl<C> fmt::Debug for Tid<C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.is_poisoned() { + f.debug_tuple("Tid") + .field(&format_args!("<poisoned>")) + .finish() + } else { + f.debug_tuple("Tid") + .field(&format_args!("{}", self.id)) + .finish() + } + } +} + +impl<C> PartialEq for Tid<C> { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl<C> Eq for Tid<C> {} + +impl<C: cfg::Config> Clone for Tid<C> { + fn clone(&self) -> Self { + Self::new(self.id) + } +} + +impl<C: cfg::Config> Copy for Tid<C> {} + +// === impl Registration === + +impl Registration { + fn new() -> Self { + Self(Cell::new(None)) + } + + #[inline(always)] + fn current<C: cfg::Config>(&self) -> Tid<C> { + if let Some(tid) = self.0.get().map(Tid::new) { + return tid; + } + + self.register() + } + + #[cold] + fn register<C: cfg::Config>(&self) -> Tid<C> { + let id = REGISTRY + .free + .lock() + .ok() + .and_then(|mut free| { + if free.len() > 1 { + free.pop_front() + } else { + None + } + }) + .unwrap_or_else(|| { + let id = REGISTRY.next.fetch_add(1, Ordering::AcqRel); + if id > Tid::<C>::BITS { + panic_in_drop!( + "creating a new thread ID ({}) would exceed the \ + maximum number of thread ID bits specified in {} \ + ({})", + id, + std::any::type_name::<C>(), + Tid::<C>::BITS, + ); + } + id + }); + + self.0.set(Some(id)); + Tid::new(id) + } +} + +// Reusing thread IDs doesn't work under loom, since this `Drop` impl results in +// an access to a `loom` lazy_static while the test is shutting down, which +// panics. T_T +// Just skip TID reuse and use loom's lazy_static macro to ensure we have a +// clean initial TID on every iteration, instead. +#[cfg(not(all(loom, any(feature = "loom", test))))] +impl Drop for Registration { + fn drop(&mut self) { + if let Some(id) = self.0.get() { + let mut free_list = REGISTRY.free.lock().unwrap_or_else(PoisonError::into_inner); + free_list.push_back(id); + } + } +} |