summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/util/slab
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/util/slab
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/util/slab')
-rw-r--r--third_party/rust/tokio/src/util/slab/addr.rs154
-rw-r--r--third_party/rust/tokio/src/util/slab/entry.rs7
-rw-r--r--third_party/rust/tokio/src/util/slab/generation.rs32
-rw-r--r--third_party/rust/tokio/src/util/slab/mod.rs107
-rw-r--r--third_party/rust/tokio/src/util/slab/page.rs187
-rw-r--r--third_party/rust/tokio/src/util/slab/shard.rs105
-rw-r--r--third_party/rust/tokio/src/util/slab/slot.rs42
-rw-r--r--third_party/rust/tokio/src/util/slab/stack.rs58
-rw-r--r--third_party/rust/tokio/src/util/slab/tests/loom_slab.rs327
-rw-r--r--third_party/rust/tokio/src/util/slab/tests/loom_stack.rs88
-rw-r--r--third_party/rust/tokio/src/util/slab/tests/mod.rs2
11 files changed, 1109 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/util/slab/addr.rs b/third_party/rust/tokio/src/util/slab/addr.rs
new file mode 100644
index 0000000000..c14e32e909
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/addr.rs
@@ -0,0 +1,154 @@
+//! Tracks the location of an entry in a slab.
+//!
+//! # Index packing
+//!
+//! A slab index consists of multiple indices packed into a single `usize` value
+//! that correspond to different parts of the slab.
+//!
+//! The least significant `MAX_PAGES + INITIAL_PAGE_SIZE.trailing_zeros() + 1`
+//! bits store the address within a shard, starting at 0 for the first slot on
+//! the first page. To index a slot within a shard, we first find the index of
+//! the page that the address falls on, and then the offset of the slot within
+//! that page.
+//!
+//! Since every page is twice as large as the previous page, and all page sizes
+//! are powers of two, we can determine the page index that contains a given
+//! address by shifting the address down by the smallest page size and looking
+//! at how many twos places necessary to represent that number, telling us what
+//! power of two page size it fits inside of. We can determine the number of
+//! twos places by counting the number of leading zeros (unused twos places) in
+//! the number's binary representation, and subtracting that count from the
+//! total number of bits in a word.
+//!
+//! Once we know what page contains an address, we can subtract the size of all
+//! previous pages from the address to determine the offset within the page.
+//!
+//! After the page address, the next `MAX_THREADS.trailing_zeros() + 1` least
+//! significant bits are the thread ID. These are used to index the array of
+//! shards to find which shard a slot belongs to. If an entry is being removed
+//! and the thread ID of its index matches that of the current thread, we can
+//! use the `remove_local` fast path; otherwise, we have to use the synchronized
+//! `remove_remote` path.
+//!
+//! Finally, a generation value is packed into the index. The `RESERVED_BITS`
+//! most significant bits are left unused, and the remaining bits between the
+//! last bit of the thread ID and the first reserved bit are used to store the
+//! generation. The generation is used as part of an atomic read-modify-write
+//! loop every time a `ScheduledIo`'s readiness is modified, or when the
+//! resource is removed, to guard against the ABA problem.
+//!
+//! Visualized:
+//!
+//! ```text
+//! ┌──────────┬───────────────┬──────────────────┬──────────────────────────┐
+//! │ reserved │ generation │ thread ID │ address │
+//! └▲─────────┴▲──────────────┴▲─────────────────┴▲────────────────────────▲┘
+//! │ │ │ │ │
+//! bits(usize) │ bits(MAX_THREADS) │ 0
+//! │ │
+//! bits(usize) - RESERVED MAX_PAGES + bits(INITIAL_PAGE_SIZE)
+//! ```
+
+use crate::util::bit;
+use crate::util::slab::{Generation, INITIAL_PAGE_SIZE, MAX_PAGES, MAX_THREADS};
+
+use std::usize;
+
+/// References the location at which an entry is stored in a slab.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub(crate) struct Address(usize);
+
+const PAGE_INDEX_SHIFT: u32 = INITIAL_PAGE_SIZE.trailing_zeros() + 1;
+
+/// Address in the shard
+const SLOT: bit::Pack = bit::Pack::least_significant(MAX_PAGES as u32 + PAGE_INDEX_SHIFT);
+
+/// Masks the thread identifier
+const THREAD: bit::Pack = SLOT.then(MAX_THREADS.trailing_zeros() + 1);
+
+/// Masks the generation
+const GENERATION: bit::Pack = THREAD
+ .then(bit::pointer_width().wrapping_sub(RESERVED.width() + THREAD.width() + SLOT.width()));
+
+// Chosen arbitrarily
+const RESERVED: bit::Pack = bit::Pack::most_significant(5);
+
+impl Address {
+ /// Represents no entry, picked to avoid collision with Mio's internals.
+ /// This value should not be passed to mio.
+ pub(crate) const NULL: usize = usize::MAX >> 1;
+
+ /// Re-exported by `Generation`.
+ pub(super) const GENERATION_WIDTH: u32 = GENERATION.width();
+
+ pub(super) fn new(shard_index: usize, generation: Generation) -> Address {
+ let mut repr = 0;
+
+ repr = SLOT.pack(shard_index, repr);
+ repr = GENERATION.pack(generation.to_usize(), repr);
+
+ Address(repr)
+ }
+
+ /// Convert from a `usize` representation.
+ pub(crate) fn from_usize(src: usize) -> Address {
+ assert_ne!(src, Self::NULL);
+
+ Address(src)
+ }
+
+ /// Convert to a `usize` representation
+ pub(crate) fn to_usize(self) -> usize {
+ self.0
+ }
+
+ pub(crate) fn generation(self) -> Generation {
+ Generation::new(GENERATION.unpack(self.0))
+ }
+
+ /// Returns the page index
+ pub(super) fn page(self) -> usize {
+ // Since every page is twice as large as the previous page, and all page
+ // sizes are powers of two, we can determine the page index that
+ // contains a given address by shifting the address down by the smallest
+ // page size and looking at how many twos places necessary to represent
+ // that number, telling us what power of two page size it fits inside
+ // of. We can determine the number of twos places by counting the number
+ // of leading zeros (unused twos places) in the number's binary
+ // representation, and subtracting that count from the total number of
+ // bits in a word.
+ let slot_shifted = (self.slot() + INITIAL_PAGE_SIZE) >> PAGE_INDEX_SHIFT;
+ (bit::pointer_width() - slot_shifted.leading_zeros()) as usize
+ }
+
+ /// Returns the slot index
+ pub(super) fn slot(self) -> usize {
+ SLOT.unpack(self.0)
+ }
+}
+
+#[cfg(test)]
+cfg_not_loom! {
+ use proptest::proptest;
+
+ #[test]
+ fn test_pack_format() {
+ assert_eq!(5, RESERVED.width());
+ assert_eq!(0b11111, RESERVED.max_value());
+ }
+
+ proptest! {
+ #[test]
+ fn address_roundtrips(
+ slot in 0usize..SLOT.max_value(),
+ generation in 0usize..Generation::MAX,
+ ) {
+ let address = Address::new(slot, Generation::new(generation));
+ // Round trip
+ let address = Address::from_usize(address.to_usize());
+
+ assert_eq!(address.slot(), slot);
+ assert_eq!(address.generation().to_usize(), generation);
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/util/slab/entry.rs b/third_party/rust/tokio/src/util/slab/entry.rs
new file mode 100644
index 0000000000..2e0b10b0fd
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/entry.rs
@@ -0,0 +1,7 @@
+use crate::util::slab::Generation;
+
+pub(crate) trait Entry: Default {
+ fn generation(&self) -> Generation;
+
+ fn reset(&self, generation: Generation) -> bool;
+}
diff --git a/third_party/rust/tokio/src/util/slab/generation.rs b/third_party/rust/tokio/src/util/slab/generation.rs
new file mode 100644
index 0000000000..4b16b2caf6
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/generation.rs
@@ -0,0 +1,32 @@
+use crate::util::bit;
+use crate::util::slab::Address;
+
+/// An mutation identifier for a slot in the slab. The generation helps prevent
+/// accessing an entry with an outdated token.
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
+pub(crate) struct Generation(usize);
+
+impl Generation {
+ pub(crate) const WIDTH: u32 = Address::GENERATION_WIDTH;
+
+ pub(super) const MAX: usize = bit::mask_for(Address::GENERATION_WIDTH);
+
+ /// Create a new generation
+ ///
+ /// # Panics
+ ///
+ /// Panics if `value` is greater than max generation.
+ pub(crate) fn new(value: usize) -> Generation {
+ assert!(value <= Self::MAX);
+ Generation(value)
+ }
+
+ /// Returns the next generation value
+ pub(crate) fn next(self) -> Generation {
+ Generation((self.0 + 1) & Self::MAX)
+ }
+
+ pub(crate) fn to_usize(self) -> usize {
+ self.0
+ }
+}
diff --git a/third_party/rust/tokio/src/util/slab/mod.rs b/third_party/rust/tokio/src/util/slab/mod.rs
new file mode 100644
index 0000000000..5082970507
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/mod.rs
@@ -0,0 +1,107 @@
+//! A lock-free concurrent slab.
+
+mod addr;
+pub(crate) use addr::Address;
+
+mod entry;
+pub(crate) use entry::Entry;
+
+mod generation;
+pub(crate) use generation::Generation;
+
+mod page;
+
+mod shard;
+use shard::Shard;
+
+mod slot;
+use slot::Slot;
+
+mod stack;
+use stack::TransferStack;
+
+#[cfg(all(loom, test))]
+mod tests;
+
+use crate::loom::sync::Mutex;
+use crate::util::bit;
+
+use std::fmt;
+
+#[cfg(target_pointer_width = "64")]
+const MAX_THREADS: usize = 4096;
+
+#[cfg(target_pointer_width = "32")]
+const MAX_THREADS: usize = 2048;
+
+/// Max number of pages per slab
+const MAX_PAGES: usize = bit::pointer_width() as usize / 4;
+
+cfg_not_loom! {
+ /// Size of first page
+ const INITIAL_PAGE_SIZE: usize = 32;
+}
+
+cfg_loom! {
+ const INITIAL_PAGE_SIZE: usize = 2;
+}
+
+/// A sharded slab.
+pub(crate) struct Slab<T> {
+ // Signal shard for now. Eventually there will be more.
+ shard: Shard<T>,
+ local: Mutex<()>,
+}
+
+unsafe impl<T: Send> Send for Slab<T> {}
+unsafe impl<T: Sync> Sync for Slab<T> {}
+
+impl<T: Entry> Slab<T> {
+ /// Returns a new slab with the default configuration parameters.
+ pub(crate) fn new() -> Slab<T> {
+ Slab {
+ shard: Shard::new(),
+ local: Mutex::new(()),
+ }
+ }
+
+ /// allocs a value into the slab, returning a key that can be used to
+ /// access it.
+ ///
+ /// If this function returns `None`, then the shard for the current thread
+ /// is full and no items can be added until some are removed, or the maximum
+ /// number of shards has been reached.
+ pub(crate) fn alloc(&self) -> Option<Address> {
+ // we must lock the slab to alloc an item.
+ let _local = self.local.lock().unwrap();
+ self.shard.alloc()
+ }
+
+ /// Removes the value associated with the given key from the slab.
+ pub(crate) fn remove(&self, idx: Address) {
+ // try to lock the slab so that we can use `remove_local`.
+ let lock = self.local.try_lock();
+
+ // if we were able to lock the slab, we are "local" and can use the fast
+ // path; otherwise, we will use `remove_remote`.
+ if lock.is_ok() {
+ self.shard.remove_local(idx)
+ } else {
+ self.shard.remove_remote(idx)
+ }
+ }
+
+ /// Return a reference to the value associated with the given key.
+ ///
+ /// If the slab does not contain a value for the given key, `None` is
+ /// returned instead.
+ pub(crate) fn get(&self, token: Address) -> Option<&T> {
+ self.shard.get(token)
+ }
+}
+
+impl<T> fmt::Debug for Slab<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Slab").field("shard", &self.shard).finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/util/slab/page.rs b/third_party/rust/tokio/src/util/slab/page.rs
new file mode 100644
index 0000000000..0000e934de
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/page.rs
@@ -0,0 +1,187 @@
+use crate::loom::cell::UnsafeCell;
+use crate::util::slab::{Address, Entry, Slot, TransferStack, INITIAL_PAGE_SIZE};
+
+use std::fmt;
+
+/// Data accessed only by the thread that owns the shard.
+pub(crate) struct Local {
+ head: UnsafeCell<usize>,
+}
+
+/// Data accessed by any thread.
+pub(crate) struct Shared<T> {
+ remote: TransferStack,
+ size: usize,
+ prev_sz: usize,
+ slab: UnsafeCell<Option<Box<[Slot<T>]>>>,
+}
+
+/// Returns the size of the page at index `n`
+pub(super) fn size(n: usize) -> usize {
+ INITIAL_PAGE_SIZE << n
+}
+
+impl Local {
+ pub(crate) fn new() -> Self {
+ Self {
+ head: UnsafeCell::new(0),
+ }
+ }
+
+ fn head(&self) -> usize {
+ self.head.with(|head| unsafe { *head })
+ }
+
+ fn set_head(&self, new_head: usize) {
+ self.head.with_mut(|head| unsafe {
+ *head = new_head;
+ })
+ }
+}
+
+impl<T: Entry> Shared<T> {
+ pub(crate) fn new(size: usize, prev_sz: usize) -> Shared<T> {
+ Self {
+ prev_sz,
+ size,
+ remote: TransferStack::new(),
+ slab: UnsafeCell::new(None),
+ }
+ }
+
+ /// Allocates storage for this page if it does not allready exist.
+ ///
+ /// This requires unique access to the page (e.g. it is called from the
+ /// thread that owns the page, or, in the case of `SingleShard`, while the
+ /// lock is held). In order to indicate this, a reference to the page's
+ /// `Local` data is taken by this function; the `Local` argument is not
+ /// actually used, but requiring it ensures that this is only called when
+ /// local access is held.
+ #[cold]
+ fn alloc_page(&self, _: &Local) {
+ debug_assert!(self.slab.with(|s| unsafe { (*s).is_none() }));
+
+ let mut slab = Vec::with_capacity(self.size);
+ slab.extend((1..self.size).map(Slot::new));
+ slab.push(Slot::new(Address::NULL));
+
+ self.slab.with_mut(|s| {
+ // this mut access is safe — it only occurs to initially
+ // allocate the page, which only happens on this thread; if the
+ // page has not yet been allocated, other threads will not try
+ // to access it yet.
+ unsafe {
+ *s = Some(slab.into_boxed_slice());
+ }
+ });
+ }
+
+ pub(crate) fn alloc(&self, local: &Local) -> Option<Address> {
+ let head = local.head();
+
+ // are there any items on the local free list? (fast path)
+ let head = if head < self.size {
+ head
+ } else {
+ // if the local free list is empty, pop all the items on the remote
+ // free list onto the local free list.
+ self.remote.pop_all()?
+ };
+
+ // if the head is still null, both the local and remote free lists are
+ // empty --- we can't fit any more items on this page.
+ if head == Address::NULL {
+ return None;
+ }
+
+ // do we need to allocate storage for this page?
+ let page_needs_alloc = self.slab.with(|s| unsafe { (*s).is_none() });
+ if page_needs_alloc {
+ self.alloc_page(local);
+ }
+
+ let gen = self.slab.with(|slab| {
+ let slab = unsafe { &*(slab) }
+ .as_ref()
+ .expect("page must have been allocated to alloc!");
+
+ let slot = &slab[head];
+
+ local.set_head(slot.next());
+ slot.generation()
+ });
+
+ let index = head + self.prev_sz;
+
+ Some(Address::new(index, gen))
+ }
+
+ pub(crate) fn get(&self, addr: Address) -> Option<&T> {
+ let page_offset = addr.slot() - self.prev_sz;
+
+ self.slab
+ .with(|slab| unsafe { &*slab }.as_ref()?.get(page_offset))
+ .map(|slot| slot.get())
+ }
+
+ pub(crate) fn remove_local(&self, local: &Local, addr: Address) {
+ let offset = addr.slot() - self.prev_sz;
+
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+
+ if slot.reset(addr.generation()) {
+ slot.set_next(local.head());
+ local.set_head(offset);
+ }
+ })
+ }
+
+ pub(crate) fn remove_remote(&self, addr: Address) {
+ let offset = addr.slot() - self.prev_sz;
+
+ self.slab.with(|slab| {
+ let slab = unsafe { &*slab }.as_ref();
+
+ let slot = if let Some(slot) = slab.and_then(|slab| slab.get(offset)) {
+ slot
+ } else {
+ return;
+ };
+
+ if !slot.reset(addr.generation()) {
+ return;
+ }
+
+ self.remote.push(offset, |next| slot.set_next(next));
+ })
+ }
+}
+
+impl fmt::Debug for Local {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.head.with(|head| {
+ let head = unsafe { *head };
+ f.debug_struct("Local")
+ .field("head", &format_args!("{:#0x}", head))
+ .finish()
+ })
+ }
+}
+
+impl<T> fmt::Debug for Shared<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shared")
+ .field("remote", &self.remote)
+ .field("prev_sz", &self.prev_sz)
+ .field("size", &self.size)
+ // .field("slab", &self.slab)
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/util/slab/shard.rs b/third_party/rust/tokio/src/util/slab/shard.rs
new file mode 100644
index 0000000000..eaca6f656a
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/shard.rs
@@ -0,0 +1,105 @@
+use crate::util::slab::{page, Address, Entry, MAX_PAGES};
+
+use std::fmt;
+
+// ┌─────────────┐ ┌────────┐
+// │ page 1 │ │ │
+// ├─────────────┤ ┌───▶│ next──┼─┐
+// │ page 2 │ │ ├────────┤ │
+// │ │ │ │XXXXXXXX│ │
+// │ local_free──┼─┘ ├────────┤ │
+// │ global_free─┼─┐ │ │◀┘
+// ├─────────────┤ └───▶│ next──┼─┐
+// │ page 3 │ ├────────┤ │
+// └─────────────┘ │XXXXXXXX│ │
+// ... ├────────┤ │
+// ┌─────────────┐ │XXXXXXXX│ │
+// │ page n │ ├────────┤ │
+// └─────────────┘ │ │◀┘
+// │ next──┼───▶
+// ├────────┤
+// │XXXXXXXX│
+// └────────┘
+// ...
+pub(super) struct Shard<T> {
+ /// The local free list for each page.
+ ///
+ /// These are only ever accessed from this shard's thread, so they are
+ /// stored separately from the shared state for the page that can be
+ /// accessed concurrently, to minimize false sharing.
+ local: Box<[page::Local]>,
+ /// The shared state for each page in this shard.
+ ///
+ /// This consists of the page's metadata (size, previous size), remote free
+ /// list, and a pointer to the actual array backing that page.
+ shared: Box<[page::Shared<T>]>,
+}
+
+impl<T: Entry> Shard<T> {
+ pub(super) fn new() -> Shard<T> {
+ let mut total_sz = 0;
+ let shared = (0..MAX_PAGES)
+ .map(|page_num| {
+ let sz = page::size(page_num);
+ let prev_sz = total_sz;
+ total_sz += sz;
+ page::Shared::new(sz, prev_sz)
+ })
+ .collect();
+
+ let local = (0..MAX_PAGES).map(|_| page::Local::new()).collect();
+
+ Shard { local, shared }
+ }
+
+ pub(super) fn alloc(&self) -> Option<Address> {
+ // Can we fit the value into an existing page?
+ for (page_idx, page) in self.shared.iter().enumerate() {
+ let local = self.local(page_idx);
+
+ if let Some(page_offset) = page.alloc(local) {
+ return Some(page_offset);
+ }
+ }
+
+ None
+ }
+
+ pub(super) fn get(&self, addr: Address) -> Option<&T> {
+ let page_idx = addr.page();
+
+ if page_idx > self.shared.len() {
+ return None;
+ }
+
+ self.shared[page_idx].get(addr)
+ }
+
+ /// Remove an item on the shard's local thread.
+ pub(super) fn remove_local(&self, addr: Address) {
+ let page_idx = addr.page();
+
+ if let Some(page) = self.shared.get(page_idx) {
+ page.remove_local(self.local(page_idx), addr);
+ }
+ }
+
+ /// Remove an item, while on a different thread from the shard's local thread.
+ pub(super) fn remove_remote(&self, addr: Address) {
+ if let Some(page) = self.shared.get(addr.page()) {
+ page.remove_remote(addr);
+ }
+ }
+
+ fn local(&self, i: usize) -> &page::Local {
+ &self.local[i]
+ }
+}
+
+impl<T> fmt::Debug for Shard<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Shard")
+ .field("shared", &self.shared)
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/util/slab/slot.rs b/third_party/rust/tokio/src/util/slab/slot.rs
new file mode 100644
index 0000000000..0608b26189
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/slot.rs
@@ -0,0 +1,42 @@
+use crate::loom::cell::UnsafeCell;
+use crate::util::slab::{Entry, Generation};
+
+/// Stores an entry in the slab.
+pub(super) struct Slot<T> {
+ next: UnsafeCell<usize>,
+ entry: T,
+}
+
+impl<T: Entry> Slot<T> {
+ /// Initialize a new `Slot` linked to `next`.
+ ///
+ /// The entry is initialized to a default value.
+ pub(super) fn new(next: usize) -> Slot<T> {
+ Slot {
+ next: UnsafeCell::new(next),
+ entry: T::default(),
+ }
+ }
+
+ pub(super) fn get(&self) -> &T {
+ &self.entry
+ }
+
+ pub(super) fn generation(&self) -> Generation {
+ self.entry.generation()
+ }
+
+ pub(super) fn reset(&self, generation: Generation) -> bool {
+ self.entry.reset(generation)
+ }
+
+ pub(super) fn next(&self) -> usize {
+ self.next.with(|next| unsafe { *next })
+ }
+
+ pub(super) fn set_next(&self, next: usize) {
+ self.next.with_mut(|n| unsafe {
+ (*n) = next;
+ })
+ }
+}
diff --git a/third_party/rust/tokio/src/util/slab/stack.rs b/third_party/rust/tokio/src/util/slab/stack.rs
new file mode 100644
index 0000000000..0ae0d71006
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/stack.rs
@@ -0,0 +1,58 @@
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::util::slab::Address;
+
+use std::fmt;
+use std::sync::atomic::Ordering;
+use std::usize;
+
+pub(super) struct TransferStack {
+ head: AtomicUsize,
+}
+
+impl TransferStack {
+ pub(super) fn new() -> Self {
+ Self {
+ head: AtomicUsize::new(Address::NULL),
+ }
+ }
+
+ pub(super) fn pop_all(&self) -> Option<usize> {
+ let val = self.head.swap(Address::NULL, Ordering::Acquire);
+
+ if val == Address::NULL {
+ None
+ } else {
+ Some(val)
+ }
+ }
+
+ pub(super) fn push(&self, value: usize, before: impl Fn(usize)) {
+ let mut next = self.head.load(Ordering::Relaxed);
+
+ loop {
+ before(next);
+
+ match self
+ .head
+ .compare_exchange(next, value, Ordering::AcqRel, Ordering::Acquire)
+ {
+ // lost the race!
+ Err(actual) => next = actual,
+ Ok(_) => return,
+ }
+ }
+ }
+}
+
+impl fmt::Debug for TransferStack {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ // Loom likes to dump all its internal state in `fmt::Debug` impls, so
+ // we override this to just print the current value in tests.
+ f.debug_struct("TransferStack")
+ .field(
+ "head",
+ &format_args!("{:#x}", self.head.load(Ordering::Relaxed)),
+ )
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/util/slab/tests/loom_slab.rs b/third_party/rust/tokio/src/util/slab/tests/loom_slab.rs
new file mode 100644
index 0000000000..48e94f0034
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/tests/loom_slab.rs
@@ -0,0 +1,327 @@
+use crate::io::driver::ScheduledIo;
+use crate::util::slab::{Address, Slab};
+
+use loom::sync::{Arc, Condvar, Mutex};
+use loom::thread;
+
+#[test]
+fn local_remove() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+
+ let s = slab.clone();
+ let t1 = thread::spawn(move || {
+ let idx = store_val(&s, 1);
+ assert_eq!(get_val(&s, idx), Some(1));
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ let idx = store_val(&s, 2);
+ assert_eq!(get_val(&s, idx), Some(2));
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ });
+
+ let s = slab.clone();
+ let t2 = thread::spawn(move || {
+ let idx = store_val(&s, 3);
+ assert_eq!(get_val(&s, idx), Some(3));
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ let idx = store_val(&s, 4);
+ s.remove(idx);
+ assert_eq!(get_val(&s, idx), None);
+ });
+
+ let s = slab;
+ let idx1 = store_val(&s, 5);
+ assert_eq!(get_val(&s, idx1), Some(5));
+ let idx2 = store_val(&s, 6);
+ assert_eq!(get_val(&s, idx2), Some(6));
+ s.remove(idx1);
+ assert_eq!(get_val(&s, idx1), None);
+ assert_eq!(get_val(&s, idx2), Some(6));
+ s.remove(idx2);
+ assert_eq!(get_val(&s, idx2), None);
+
+ t1.join().expect("thread 1 should not panic");
+ t2.join().expect("thread 2 should not panic");
+ });
+}
+
+#[test]
+fn remove_remote() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+
+ let idx1 = store_val(&slab, 1);
+ assert_eq!(get_val(&slab, idx1), Some(1));
+
+ let idx2 = store_val(&slab, 2);
+ assert_eq!(get_val(&slab, idx2), Some(2));
+
+ let idx3 = store_val(&slab, 3);
+ assert_eq!(get_val(&slab, idx3), Some(3));
+
+ let s = slab.clone();
+ let t1 = thread::spawn(move || {
+ assert_eq!(get_val(&s, idx2), Some(2));
+ s.remove(idx2);
+ assert_eq!(get_val(&s, idx2), None);
+ });
+
+ let s = slab.clone();
+ let t2 = thread::spawn(move || {
+ assert_eq!(get_val(&s, idx3), Some(3));
+ s.remove(idx3);
+ assert_eq!(get_val(&s, idx3), None);
+ });
+
+ t1.join().expect("thread 1 should not panic");
+ t2.join().expect("thread 2 should not panic");
+
+ assert_eq!(get_val(&slab, idx1), Some(1));
+ assert_eq!(get_val(&slab, idx2), None);
+ assert_eq!(get_val(&slab, idx3), None);
+ });
+}
+
+#[test]
+fn remove_remote_and_reuse() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+
+ let idx1 = store_val(&slab, 1);
+ let idx2 = store_val(&slab, 2);
+
+ assert_eq!(get_val(&slab, idx1), Some(1));
+ assert_eq!(get_val(&slab, idx2), Some(2));
+
+ let s = slab.clone();
+ let t1 = thread::spawn(move || {
+ s.remove(idx1);
+ let value = get_val(&s, idx1);
+
+ // We may or may not see the new value yet, depending on when
+ // this occurs, but we must either see the new value or `None`;
+ // the old value has been removed!
+ assert!(value == None || value == Some(3));
+ });
+
+ let idx3 = store_when_free(&slab, 3);
+ t1.join().expect("thread 1 should not panic");
+
+ assert_eq!(get_val(&slab, idx3), Some(3));
+ assert_eq!(get_val(&slab, idx2), Some(2));
+ });
+}
+
+#[test]
+fn concurrent_alloc_remove() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+ let pair = Arc::new((Mutex::new(None), Condvar::new()));
+
+ let slab2 = slab.clone();
+ let pair2 = pair.clone();
+ let remover = thread::spawn(move || {
+ let (lock, cvar) = &*pair2;
+ for _ in 0..2 {
+ let mut next = lock.lock().unwrap();
+ while next.is_none() {
+ next = cvar.wait(next).unwrap();
+ }
+ let key = next.take().unwrap();
+ slab2.remove(key);
+ assert_eq!(get_val(&slab2, key), None);
+ cvar.notify_one();
+ }
+ });
+
+ let (lock, cvar) = &*pair;
+ for i in 0..2 {
+ let key = store_val(&slab, i);
+
+ let mut next = lock.lock().unwrap();
+ *next = Some(key);
+ cvar.notify_one();
+
+ // Wait for the item to be removed.
+ while next.is_some() {
+ next = cvar.wait(next).unwrap();
+ }
+
+ assert_eq!(get_val(&slab, key), None);
+ }
+
+ remover.join().unwrap();
+ })
+}
+
+#[test]
+fn concurrent_remove_remote_and_reuse() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+
+ let idx1 = store_val(&slab, 1);
+ let idx2 = store_val(&slab, 2);
+
+ assert_eq!(get_val(&slab, idx1), Some(1));
+ assert_eq!(get_val(&slab, idx2), Some(2));
+
+ let s = slab.clone();
+ let s2 = slab.clone();
+ let t1 = thread::spawn(move || {
+ s.remove(idx1);
+ });
+
+ let t2 = thread::spawn(move || {
+ s2.remove(idx2);
+ });
+
+ let idx3 = store_when_free(&slab, 3);
+ t1.join().expect("thread 1 should not panic");
+ t2.join().expect("thread 1 should not panic");
+
+ assert!(get_val(&slab, idx1).is_none());
+ assert!(get_val(&slab, idx2).is_none());
+ assert_eq!(get_val(&slab, idx3), Some(3));
+ });
+}
+
+#[test]
+fn alloc_remove_get() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+ let pair = Arc::new((Mutex::new(None), Condvar::new()));
+
+ let slab2 = slab.clone();
+ let pair2 = pair.clone();
+ let t1 = thread::spawn(move || {
+ let slab = slab2;
+ let (lock, cvar) = &*pair2;
+ // allocate one entry just so that we have to use the final one for
+ // all future allocations.
+ let _key0 = store_val(&slab, 0);
+ let key = store_val(&slab, 1);
+
+ let mut next = lock.lock().unwrap();
+ *next = Some(key);
+ cvar.notify_one();
+ // remove the second entry
+ slab.remove(key);
+ // store a new readiness at the same location (since the slab
+ // already has an entry in slot 0)
+ store_val(&slab, 2);
+ });
+
+ let (lock, cvar) = &*pair;
+ // wait for the second entry to be stored...
+ let mut next = lock.lock().unwrap();
+ while next.is_none() {
+ next = cvar.wait(next).unwrap();
+ }
+ let key = next.unwrap();
+
+ // our generation will be stale when the second store occurs at that
+ // index, we must not see the value of that store.
+ let val = get_val(&slab, key);
+ assert_ne!(val, Some(2), "generation must have advanced!");
+
+ t1.join().unwrap();
+ })
+}
+
+#[test]
+fn alloc_remove_set() {
+ loom::model(|| {
+ let slab = Arc::new(Slab::new());
+ let pair = Arc::new((Mutex::new(None), Condvar::new()));
+
+ let slab2 = slab.clone();
+ let pair2 = pair.clone();
+ let t1 = thread::spawn(move || {
+ let slab = slab2;
+ let (lock, cvar) = &*pair2;
+ // allocate one entry just so that we have to use the final one for
+ // all future allocations.
+ let _key0 = store_val(&slab, 0);
+ let key = store_val(&slab, 1);
+
+ let mut next = lock.lock().unwrap();
+ *next = Some(key);
+ cvar.notify_one();
+
+ slab.remove(key);
+ // remove the old entry and insert a new one, with a new generation.
+ let key2 = slab.alloc().expect("store key 2");
+ // after the remove, we must not see the value written with the
+ // stale index.
+ assert_eq!(
+ get_val(&slab, key),
+ None,
+ "stale set must no longer be visible"
+ );
+ assert_eq!(get_val(&slab, key2), Some(0));
+ key2
+ });
+
+ let (lock, cvar) = &*pair;
+
+ // wait for the second entry to be stored. the index we get from the
+ // other thread may become stale after a write.
+ let mut next = lock.lock().unwrap();
+ while next.is_none() {
+ next = cvar.wait(next).unwrap();
+ }
+ let key = next.unwrap();
+
+ // try to write to the index with our generation
+ slab.get(key).map(|val| val.set_readiness(key, |_| 2));
+
+ let key2 = t1.join().unwrap();
+ // after the remove, we must not see the value written with the
+ // stale index either.
+ assert_eq!(
+ get_val(&slab, key),
+ None,
+ "stale set must no longer be visible"
+ );
+ assert_eq!(get_val(&slab, key2), Some(0));
+ });
+}
+
+fn get_val(slab: &Arc<Slab<ScheduledIo>>, address: Address) -> Option<usize> {
+ slab.get(address).and_then(|s| s.get_readiness(address))
+}
+
+fn store_val(slab: &Arc<Slab<ScheduledIo>>, readiness: usize) -> Address {
+ let key = slab.alloc().expect("allocate slot");
+
+ if let Some(slot) = slab.get(key) {
+ slot.set_readiness(key, |_| readiness)
+ .expect("generation should still be valid!");
+ } else {
+ panic!("slab did not contain a value for {:?}", key);
+ }
+
+ key
+}
+
+fn store_when_free(slab: &Arc<Slab<ScheduledIo>>, readiness: usize) -> Address {
+ let key = loop {
+ if let Some(key) = slab.alloc() {
+ break key;
+ }
+
+ thread::yield_now();
+ };
+
+ if let Some(slot) = slab.get(key) {
+ slot.set_readiness(key, |_| readiness)
+ .expect("generation should still be valid!");
+ } else {
+ panic!("slab did not contain a value for {:?}", key);
+ }
+
+ key
+}
diff --git a/third_party/rust/tokio/src/util/slab/tests/loom_stack.rs b/third_party/rust/tokio/src/util/slab/tests/loom_stack.rs
new file mode 100644
index 0000000000..47ad46d3a1
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/tests/loom_stack.rs
@@ -0,0 +1,88 @@
+use crate::util::slab::TransferStack;
+
+use loom::cell::UnsafeCell;
+use loom::sync::Arc;
+use loom::thread;
+
+#[test]
+fn transfer_stack() {
+ loom::model(|| {
+ let causalities = [UnsafeCell::new(None), UnsafeCell::new(None)];
+ let shared = Arc::new((causalities, TransferStack::new()));
+ let shared1 = shared.clone();
+ let shared2 = shared.clone();
+
+ // Spawn two threads that both try to push to the stack.
+ let t1 = thread::spawn(move || {
+ let (causalities, stack) = &*shared1;
+ stack.push(0, |prev| {
+ causalities[0].with_mut(|c| unsafe {
+ *c = Some(prev);
+ });
+ });
+ });
+
+ let t2 = thread::spawn(move || {
+ let (causalities, stack) = &*shared2;
+ stack.push(1, |prev| {
+ causalities[1].with_mut(|c| unsafe {
+ *c = Some(prev);
+ });
+ });
+ });
+
+ let (causalities, stack) = &*shared;
+
+ // Try to pop from the stack...
+ let mut idx = stack.pop_all();
+ while idx == None {
+ idx = stack.pop_all();
+ thread::yield_now();
+ }
+ let idx = idx.unwrap();
+
+ let saw_both = causalities[idx].with(|val| {
+ let val = unsafe { *val };
+ assert!(
+ val.is_some(),
+ "UnsafeCell write must happen-before index is pushed to the stack!",
+ );
+ // were there two entries in the stack? if so, check that
+ // both saw a write.
+ if let Some(c) = causalities.get(val.unwrap()) {
+ c.with(|val| {
+ let val = unsafe { *val };
+ assert!(
+ val.is_some(),
+ "UnsafeCell write must happen-before index is pushed to the stack!",
+ );
+ });
+ true
+ } else {
+ false
+ }
+ });
+
+ // We only saw one push. Ensure that the other push happens too.
+ if !saw_both {
+ // Try to pop from the stack...
+ let mut idx = stack.pop_all();
+ while idx == None {
+ idx = stack.pop_all();
+ thread::yield_now();
+ }
+ let idx = idx.unwrap();
+
+ causalities[idx].with(|val| {
+ let val = unsafe { *val };
+ assert!(
+ val.is_some(),
+ "UnsafeCell write must happen-before index is pushed to the stack!",
+ );
+ });
+ }
+
+ t1.join().unwrap();
+ t2.join().unwrap();
+ });
+}
diff --git a/third_party/rust/tokio/src/util/slab/tests/mod.rs b/third_party/rust/tokio/src/util/slab/tests/mod.rs
new file mode 100644
index 0000000000..7f79354466
--- /dev/null
+++ b/third_party/rust/tokio/src/util/slab/tests/mod.rs
@@ -0,0 +1,2 @@
+mod loom_slab;
+mod loom_stack;