use crate::{ cfg::{self, CfgPrivate}, clear::Clear, page, sync::{ alloc, atomic::{ AtomicPtr, AtomicUsize, Ordering::{self, *}, }, }, tid::Tid, Pack, }; use std::{fmt, ptr, slice}; // ┌─────────────┐ ┌────────┐ // │ page 1 │ │ │ // ├─────────────┤ ┌───▶│ next──┼─┐ // │ page 2 │ │ ├────────┤ │ // │ │ │ │XXXXXXXX│ │ // │ local_free──┼─┘ ├────────┤ │ // │ global_free─┼─┐ │ │◀┘ // ├─────────────┤ └───▶│ next──┼─┐ // │ page 3 │ ├────────┤ │ // └─────────────┘ │XXXXXXXX│ │ // ... ├────────┤ │ // ┌─────────────┐ │XXXXXXXX│ │ // │ page n │ ├────────┤ │ // └─────────────┘ │ │◀┘ // │ next──┼───▶ // ├────────┤ // │XXXXXXXX│ // └────────┘ // ... pub(crate) struct Shard { /// The shard's parent thread ID. pub(crate) tid: usize, /// The local free list for each page. /// /// These are only ever accessed from this shard's thread, so they are /// stored separately from the shared state for the page that can be /// accessed concurrently, to minimize false sharing. local: Box<[page::Local]>, /// The shared state for each page in this shard. /// /// This consists of the page's metadata (size, previous size), remote free /// list, and a pointer to the actual array backing that page. shared: Box<[page::Shared]>, } pub(crate) struct Array { shards: Box<[Ptr]>, max: AtomicUsize, } #[derive(Debug)] struct Ptr(AtomicPtr>>); #[derive(Debug)] pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr>); // === impl Shard === impl Shard where C: cfg::Config, { #[inline(always)] pub(crate) fn with_slot<'a, U>( &'a self, idx: usize, f: impl FnOnce(&'a page::Slot) -> Option, ) -> Option { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); test_println!("-> {:?}", addr); if page_index > self.shared.len() { return None; } self.shared[page_index].with_slot(addr, f) } pub(crate) fn new(tid: usize) -> Self { let mut total_sz = 0; let shared = (0..C::MAX_PAGES) .map(|page_num| { let sz = C::page_size(page_num); let prev_sz = total_sz; total_sz += sz; page::Shared::new(sz, prev_sz) }) .collect(); let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect(); Self { tid, local, shared } } } impl Shard, C> where C: cfg::Config, { /// Remove an item on the shard's local thread. pub(crate) fn take_local(&self, idx: usize) -> Option { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); test_println!("-> remove_local {:?}", addr); self.shared .get(page_index)? .take(addr, C::unpack_gen(idx), self.local(page_index)) } /// Remove an item, while on a different thread from the shard's local thread. pub(crate) fn take_remote(&self, idx: usize) -> Option { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); debug_assert!(Tid::::current().as_usize() != self.tid); let (addr, page_index) = page::indices::(idx); test_println!("-> take_remote {:?}; page {:?}", addr, page_index); let shared = self.shared.get(page_index)?; shared.take(addr, C::unpack_gen(idx), shared.free_list()) } pub(crate) fn remove_local(&self, idx: usize) -> bool { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); if page_index > self.shared.len() { return false; } self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index)) } pub(crate) fn remove_remote(&self, idx: usize) -> bool { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); if page_index > self.shared.len() { return false; } let shared = &self.shared[page_index]; shared.remove(addr, C::unpack_gen(idx), shared.free_list()) } pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared, C>> { self.shared.iter() } } impl Shard where T: Clear + Default, C: cfg::Config, { pub(crate) fn init_with( &self, mut init: impl FnMut(usize, &page::Slot) -> Option, ) -> Option { // Can we fit the value into an exist`ing page? for (page_idx, page) in self.shared.iter().enumerate() { let local = self.local(page_idx); test_println!("-> page {}; {:?}; {:?}", page_idx, local, page); if let Some(res) = page.init_with(local, &mut init) { return Some(res); } } None } pub(crate) fn mark_clear_local(&self, idx: usize) -> bool { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); if page_index > self.shared.len() { return false; } self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index)) } pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); if page_index > self.shared.len() { return false; } let shared = &self.shared[page_index]; shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list()) } pub(crate) fn clear_after_release(&self, idx: usize) { crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire); let tid = Tid::::current().as_usize(); test_println!( "-> clear_after_release; self.tid={:?}; current.tid={:?};", tid, self.tid ); if tid == self.tid { self.clear_local(idx); } else { self.clear_remote(idx); } } fn clear_local(&self, idx: usize) -> bool { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); if page_index > self.shared.len() { return false; } self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index)) } fn clear_remote(&self, idx: usize) -> bool { debug_assert_eq_in_drop!(Tid::::from_packed(idx).as_usize(), self.tid); let (addr, page_index) = page::indices::(idx); if page_index > self.shared.len() { return false; } let shared = &self.shared[page_index]; shared.clear(addr, C::unpack_gen(idx), shared.free_list()) } #[inline(always)] fn local(&self, i: usize) -> &page::Local { #[cfg(debug_assertions)] debug_assert_eq_in_drop!( Tid::::current().as_usize(), self.tid, "tried to access local data from another thread!" ); &self.local[i] } } impl fmt::Debug for Shard { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut d = f.debug_struct("Shard"); #[cfg(debug_assertions)] d.field("tid", &self.tid); d.field("shared", &self.shared).finish() } } // === impl Array === impl Array where C: cfg::Config, { pub(crate) fn new() -> Self { let mut shards = Vec::with_capacity(C::MAX_SHARDS); for _ in 0..C::MAX_SHARDS { // XXX(eliza): T_T this could be avoided with maybeuninit or something... shards.push(Ptr::null()); } Self { shards: shards.into(), max: AtomicUsize::new(0), } } #[inline] pub(crate) fn get(&self, idx: usize) -> Option<&Shard> { test_println!("-> get shard={}", idx); self.shards.get(idx)?.load(Acquire) } #[inline] pub(crate) fn current(&self) -> (Tid, &Shard) { let tid = Tid::::current(); test_println!("current: {:?}", tid); let idx = tid.as_usize(); assert!( idx < self.shards.len(), "Thread count overflowed the configured max count. \ Thread index = {}, max threads = {}.", idx, C::MAX_SHARDS, ); // It's okay for this to be relaxed. The value is only ever stored by // the thread that corresponds to the index, and we are that thread. let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| { let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx)))); test_println!("-> allocated new shard for index {} at {:p}", idx, ptr); self.shards[idx].set(ptr); let mut max = self.max.load(Acquire); while max < idx { match self.max.compare_exchange(max, idx, AcqRel, Acquire) { Ok(_) => break, Err(actual) => max = actual, } } test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max); unsafe { // Safety: we just put it there! &*ptr } .get_ref() }); (tid, shard) } pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> { test_println!("Array::iter_mut"); let max = self.max.load(Acquire); test_println!("-> highest index={}", max); IterMut(self.shards[0..=max].iter_mut()) } } impl Drop for Array { fn drop(&mut self) { // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... let max = self.max.load(Acquire); for shard in &self.shards[0..=max] { // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`... let ptr = shard.0.load(Acquire); if ptr.is_null() { continue; } let shard = unsafe { // Safety: this is the only place where these boxes are // deallocated, and we have exclusive access to the shard array, // because...we are dropping it... Box::from_raw(ptr) }; drop(shard) } } } impl fmt::Debug for Array { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let max = self.max.load(Acquire); let mut set = f.debug_map(); for shard in &self.shards[0..=max] { let ptr = shard.0.load(Acquire); if let Some(shard) = ptr::NonNull::new(ptr) { set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() }); } else { set.entry(&format_args!("{:p}", ptr), &()); } } set.finish() } } // === impl Ptr === impl Ptr { #[inline] fn null() -> Self { Self(AtomicPtr::new(ptr::null_mut())) } #[inline] fn load(&self, order: Ordering) -> Option<&Shard> { let ptr = self.0.load(order); test_println!("---> loaded={:p} (order={:?})", ptr, order); if ptr.is_null() { test_println!("---> null"); return None; } let track = unsafe { // Safety: The returned reference will have the same lifetime as the // reference to the shard pointer, which (morally, if not actually) // owns the shard. The shard is only deallocated when the shard // array is dropped, and it won't be dropped while this pointer is // borrowed --- and the returned reference has the same lifetime. // // We know that the pointer is not null, because we just // null-checked it immediately prior. &*ptr }; Some(track.get_ref()) } #[inline] fn set(&self, new: *mut alloc::Track>) { self.0 .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire) .expect("a shard can only be inserted by the thread that owns it, this is a bug!"); } } // === Iterators === impl<'a, T, C> Iterator for IterMut<'a, T, C> where T: 'a, C: cfg::Config + 'a, { type Item = &'a Shard; fn next(&mut self) -> Option { test_println!("IterMut::next"); loop { // Skip over empty indices if they are less than the highest // allocated shard. Some threads may have accessed the slab // (generating a thread ID) but never actually inserted data, so // they may have never allocated a shard. let next = self.0.next(); test_println!("-> next.is_some={}", next.is_some()); if let Some(shard) = next?.load(Acquire) { test_println!("-> done"); return Some(shard); } } } }