summaryrefslogtreecommitdiffstats
path: root/vendor/sharded-slab/src/shard.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/sharded-slab/src/shard.rs')
-rw-r--r--vendor/sharded-slab/src/shard.rs432
1 files changed, 432 insertions, 0 deletions
diff --git a/vendor/sharded-slab/src/shard.rs b/vendor/sharded-slab/src/shard.rs
new file mode 100644
index 000000000..0d054d7e5
--- /dev/null
+++ b/vendor/sharded-slab/src/shard.rs
@@ -0,0 +1,432 @@
+use crate::{
+ cfg::{self, CfgPrivate},
+ clear::Clear,
+ page,
+ sync::{
+ alloc,
+ atomic::{
+ AtomicPtr, AtomicUsize,
+ Ordering::{self, *},
+ },
+ },
+ tid::Tid,
+ Pack,
+};
+
+use std::{fmt, ptr, slice};
+
+// ┌─────────────┐ ┌────────┐
+// │ page 1 │ │ │
+// ├─────────────┤ ┌───▶│ next──┼─┐
+// │ page 2 │ │ ├────────┤ │
+// │ │ │ │XXXXXXXX│ │
+// │ local_free──┼─┘ ├────────┤ │
+// │ global_free─┼─┐ │ │◀┘
+// ├─────────────┤ └───▶│ next──┼─┐
+// │ page 3 │ ├────────┤ │
+// └─────────────┘ │XXXXXXXX│ │
+// ... ├────────┤ │
+// ┌─────────────┐ │XXXXXXXX│ │
+// │ page n │ ├────────┤ │
+// └─────────────┘ │ │◀┘
+// │ next──┼───▶
+// ├────────┤
+// │XXXXXXXX│
+// └────────┘
+// ...
+pub(crate) struct Shard<T, C: cfg::Config> {
+ /// The shard's parent thread ID.
+ pub(crate) tid: usize,
+ /// The local free list for each page.
+ ///
+ /// These are only ever accessed from this shard's thread, so they are
+ /// stored separately from the shared state for the page that can be
+ /// accessed concurrently, to minimize false sharing.
+ local: Box<[page::Local]>,
+ /// The shared state for each page in this shard.
+ ///
+ /// This consists of the page's metadata (size, previous size), remote free
+ /// list, and a pointer to the actual array backing that page.
+ shared: Box<[page::Shared<T, C>]>,
+}
+
+pub(crate) struct Array<T, C: cfg::Config> {
+ shards: Box<[Ptr<T, C>]>,
+ max: AtomicUsize,
+}
+
+#[derive(Debug)]
+struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
+
+#[derive(Debug)]
+pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
+
+// === impl Shard ===
+
+impl<T, C> Shard<T, C>
+where
+ C: cfg::Config,
+{
+ #[inline(always)]
+ pub(crate) fn with_slot<'a, U>(
+ &'a self,
+ idx: usize,
+ f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
+ ) -> Option<U> {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ test_println!("-> {:?}", addr);
+ if page_index > self.shared.len() {
+ return None;
+ }
+
+ self.shared[page_index].with_slot(addr, f)
+ }
+
+ pub(crate) fn new(tid: usize) -> Self {
+ let mut total_sz = 0;
+ let shared = (0..C::MAX_PAGES)
+ .map(|page_num| {
+ let sz = C::page_size(page_num);
+ let prev_sz = total_sz;
+ total_sz += sz;
+ page::Shared::new(sz, prev_sz)
+ })
+ .collect();
+ let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
+ Self { tid, local, shared }
+ }
+}
+
+impl<T, C> Shard<Option<T>, C>
+where
+ C: cfg::Config,
+{
+ /// Remove an item on the shard's local thread.
+ pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ test_println!("-> remove_local {:?}", addr);
+
+ self.shared
+ .get(page_index)?
+ .take(addr, C::unpack_gen(idx), self.local(page_index))
+ }
+
+ /// Remove an item, while on a different thread from the shard's local thread.
+ pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ debug_assert!(Tid::<C>::current().as_usize() != self.tid);
+
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
+
+ let shared = self.shared.get(page_index)?;
+ shared.take(addr, C::unpack_gen(idx), shared.free_list())
+ }
+
+ pub(crate) fn remove_local(&self, idx: usize) -> bool {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ if page_index > self.shared.len() {
+ return false;
+ }
+
+ self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
+ }
+
+ pub(crate) fn remove_remote(&self, idx: usize) -> bool {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ if page_index > self.shared.len() {
+ return false;
+ }
+
+ let shared = &self.shared[page_index];
+ shared.remove(addr, C::unpack_gen(idx), shared.free_list())
+ }
+
+ pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> {
+ self.shared.iter()
+ }
+}
+
+impl<T, C> Shard<T, C>
+where
+ T: Clear + Default,
+ C: cfg::Config,
+{
+ pub(crate) fn init_with<U>(
+ &self,
+ mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
+ ) -> Option<U> {
+ // Can we fit the value into an exist`ing page?
+ for (page_idx, page) in self.shared.iter().enumerate() {
+ let local = self.local(page_idx);
+
+ test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
+
+ if let Some(res) = page.init_with(local, &mut init) {
+ return Some(res);
+ }
+ }
+
+ None
+ }
+
+ pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ if page_index > self.shared.len() {
+ return false;
+ }
+
+ self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
+ }
+
+ pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ if page_index > self.shared.len() {
+ return false;
+ }
+
+ let shared = &self.shared[page_index];
+ shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
+ }
+
+ pub(crate) fn clear_after_release(&self, idx: usize) {
+ crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
+ let tid = Tid::<C>::current().as_usize();
+ test_println!(
+ "-> clear_after_release; self.tid={:?}; current.tid={:?};",
+ tid,
+ self.tid
+ );
+ if tid == self.tid {
+ self.clear_local(idx);
+ } else {
+ self.clear_remote(idx);
+ }
+ }
+
+ fn clear_local(&self, idx: usize) -> bool {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ if page_index > self.shared.len() {
+ return false;
+ }
+
+ self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
+ }
+
+ fn clear_remote(&self, idx: usize) -> bool {
+ debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
+ let (addr, page_index) = page::indices::<C>(idx);
+
+ if page_index > self.shared.len() {
+ return false;
+ }
+
+ let shared = &self.shared[page_index];
+ shared.clear(addr, C::unpack_gen(idx), shared.free_list())
+ }
+
+ #[inline(always)]
+ fn local(&self, i: usize) -> &page::Local {
+ #[cfg(debug_assertions)]
+ debug_assert_eq_in_drop!(
+ Tid::<C>::current().as_usize(),
+ self.tid,
+ "tried to access local data from another thread!"
+ );
+
+ &self.local[i]
+ }
+}
+
+impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let mut d = f.debug_struct("Shard");
+
+ #[cfg(debug_assertions)]
+ d.field("tid", &self.tid);
+ d.field("shared", &self.shared).finish()
+ }
+}
+
+// === impl Array ===
+
+impl<T, C> Array<T, C>
+where
+ C: cfg::Config,
+{
+ pub(crate) fn new() -> Self {
+ let mut shards = Vec::with_capacity(C::MAX_SHARDS);
+ for _ in 0..C::MAX_SHARDS {
+ // XXX(eliza): T_T this could be avoided with maybeuninit or something...
+ shards.push(Ptr::null());
+ }
+ Self {
+ shards: shards.into(),
+ max: AtomicUsize::new(0),
+ }
+ }
+
+ #[inline]
+ pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> {
+ test_println!("-> get shard={}", idx);
+ self.shards.get(idx)?.load(Acquire)
+ }
+
+ #[inline]
+ pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) {
+ let tid = Tid::<C>::current();
+ test_println!("current: {:?}", tid);
+ let idx = tid.as_usize();
+ assert!(
+ idx < self.shards.len(),
+ "Thread count overflowed the configured max count. \
+ Thread index = {}, max threads = {}.",
+ idx,
+ C::MAX_SHARDS,
+ );
+ // It's okay for this to be relaxed. The value is only ever stored by
+ // the thread that corresponds to the index, and we are that thread.
+ let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
+ let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
+ test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
+ self.shards[idx].set(ptr);
+ let mut max = self.max.load(Acquire);
+ while max < idx {
+ match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
+ Ok(_) => break,
+ Err(actual) => max = actual,
+ }
+ }
+ test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
+ unsafe {
+ // Safety: we just put it there!
+ &*ptr
+ }
+ .get_ref()
+ });
+ (tid, shard)
+ }
+
+ pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
+ test_println!("Array::iter_mut");
+ let max = self.max.load(Acquire);
+ test_println!("-> highest index={}", max);
+ IterMut(self.shards[0..=max].iter_mut())
+ }
+}
+
+impl<T, C: cfg::Config> Drop for Array<T, C> {
+ fn drop(&mut self) {
+ // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
+ let max = self.max.load(Acquire);
+ for shard in &self.shards[0..=max] {
+ // XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
+ let ptr = shard.0.load(Acquire);
+ if ptr.is_null() {
+ continue;
+ }
+ let shard = unsafe {
+ // Safety: this is the only place where these boxes are
+ // deallocated, and we have exclusive access to the shard array,
+ // because...we are dropping it...
+ Box::from_raw(ptr)
+ };
+ drop(shard)
+ }
+ }
+}
+
+impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let max = self.max.load(Acquire);
+ let mut set = f.debug_map();
+ for shard in &self.shards[0..=max] {
+ let ptr = shard.0.load(Acquire);
+ if let Some(shard) = ptr::NonNull::new(ptr) {
+ set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
+ } else {
+ set.entry(&format_args!("{:p}", ptr), &());
+ }
+ }
+ set.finish()
+ }
+}
+
+// === impl Ptr ===
+
+impl<T, C: cfg::Config> Ptr<T, C> {
+ #[inline]
+ fn null() -> Self {
+ Self(AtomicPtr::new(ptr::null_mut()))
+ }
+
+ #[inline]
+ fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
+ let ptr = self.0.load(order);
+ test_println!("---> loaded={:p} (order={:?})", ptr, order);
+ if ptr.is_null() {
+ test_println!("---> null");
+ return None;
+ }
+ let track = unsafe {
+ // Safety: The returned reference will have the same lifetime as the
+ // reference to the shard pointer, which (morally, if not actually)
+ // owns the shard. The shard is only deallocated when the shard
+ // array is dropped, and it won't be dropped while this pointer is
+ // borrowed --- and the returned reference has the same lifetime.
+ //
+ // We know that the pointer is not null, because we just
+ // null-checked it immediately prior.
+ &*ptr
+ };
+
+ Some(track.get_ref())
+ }
+
+ #[inline]
+ fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
+ self.0
+ .compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
+ .expect("a shard can only be inserted by the thread that owns it, this is a bug!");
+ }
+}
+
+// === Iterators ===
+
+impl<'a, T, C> Iterator for IterMut<'a, T, C>
+where
+ T: 'a,
+ C: cfg::Config + 'a,
+{
+ type Item = &'a Shard<T, C>;
+ fn next(&mut self) -> Option<Self::Item> {
+ test_println!("IterMut::next");
+ loop {
+ // Skip over empty indices if they are less than the highest
+ // allocated shard. Some threads may have accessed the slab
+ // (generating a thread ID) but never actually inserted data, so
+ // they may have never allocated a shard.
+ let next = self.0.next();
+ test_println!("-> next.is_some={}", next.is_some());
+ if let Some(shard) = next?.load(Acquire) {
+ test_println!("-> done");
+ return Some(shard);
+ }
+ }
+ }
+}