diff options
Diffstat (limited to 'third_party/rust/tokio/src/util')
-rw-r--r-- | third_party/rust/tokio/src/util/atomic_cell.rs | 51 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/bit.rs | 77 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/error.rs | 17 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/idle_notified_set.rs | 463 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/linked_list.rs | 693 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/mod.rs | 83 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/pad.rs | 52 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/rand.rs | 64 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/slab.rs | 855 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/sync_wrapper.rs | 26 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/trace.rs | 99 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/try_lock.rs | 80 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/vec_deque_cell.rs | 53 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/wake.rs | 80 | ||||
-rw-r--r-- | third_party/rust/tokio/src/util/wake_list.rs | 53 |
15 files changed, 2746 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/util/atomic_cell.rs b/third_party/rust/tokio/src/util/atomic_cell.rs new file mode 100644 index 0000000000..07e37303a7 --- /dev/null +++ b/third_party/rust/tokio/src/util/atomic_cell.rs @@ -0,0 +1,51 @@ +use crate::loom::sync::atomic::AtomicPtr; + +use std::ptr; +use std::sync::atomic::Ordering::AcqRel; + +pub(crate) struct AtomicCell<T> { + data: AtomicPtr<T>, +} + +unsafe impl<T: Send> Send for AtomicCell<T> {} +unsafe impl<T: Send> Sync for AtomicCell<T> {} + +impl<T> AtomicCell<T> { + pub(crate) fn new(data: Option<Box<T>>) -> AtomicCell<T> { + AtomicCell { + data: AtomicPtr::new(to_raw(data)), + } + } + + pub(crate) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> { + let old = self.data.swap(to_raw(val), AcqRel); + from_raw(old) + } + + pub(crate) fn set(&self, val: Box<T>) { + let _ = self.swap(Some(val)); + } + + pub(crate) fn take(&self) -> Option<Box<T>> { + self.swap(None) + } +} + +fn to_raw<T>(data: Option<Box<T>>) -> *mut T { + data.map(Box::into_raw).unwrap_or(ptr::null_mut()) +} + +fn from_raw<T>(val: *mut T) -> Option<Box<T>> { + if val.is_null() { + None + } else { + Some(unsafe { Box::from_raw(val) }) + } +} + +impl<T> Drop for AtomicCell<T> { + fn drop(&mut self) { + // Free any data still held by the cell + let _ = self.take(); + } +} diff --git a/third_party/rust/tokio/src/util/bit.rs b/third_party/rust/tokio/src/util/bit.rs new file mode 100644 index 0000000000..a43c2c2d36 --- /dev/null +++ b/third_party/rust/tokio/src/util/bit.rs @@ -0,0 +1,77 @@ +use std::fmt; + +#[derive(Clone, Copy, PartialEq)] +pub(crate) struct Pack { + mask: usize, + shift: u32, +} + +impl Pack { + /// Value is packed in the `width` least-significant bits. + pub(crate) const fn least_significant(width: u32) -> Pack { + let mask = mask_for(width); + + Pack { mask, shift: 0 } + } + + /// Value is packed in the `width` more-significant bits. + pub(crate) const fn then(&self, width: u32) -> Pack { + let shift = pointer_width() - self.mask.leading_zeros(); + let mask = mask_for(width) << shift; + + Pack { mask, shift } + } + + /// Width, in bits, dedicated to storing the value. + pub(crate) const fn width(&self) -> u32 { + pointer_width() - (self.mask >> self.shift).leading_zeros() + } + + /// Max representable value. + pub(crate) const fn max_value(&self) -> usize { + (1 << self.width()) - 1 + } + + pub(crate) fn pack(&self, value: usize, base: usize) -> usize { + assert!(value <= self.max_value()); + (base & !self.mask) | (value << self.shift) + } + + /// Packs the value with `base`, losing any bits of `value` that fit. + /// + /// If `value` is larger than the max value that can be represented by the + /// allotted width, the most significant bits are truncated. + pub(crate) fn pack_lossy(&self, value: usize, base: usize) -> usize { + self.pack(value & self.max_value(), base) + } + + pub(crate) fn unpack(&self, src: usize) -> usize { + unpack(src, self.mask, self.shift) + } +} + +impl fmt::Debug for Pack { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "Pack {{ mask: {:b}, shift: {} }}", + self.mask, self.shift + ) + } +} + +/// Returns the width of a pointer in bits. +pub(crate) const fn pointer_width() -> u32 { + std::mem::size_of::<usize>() as u32 * 8 +} + +/// Returns a `usize` with the right-most `n` bits set. +pub(crate) const fn mask_for(n: u32) -> usize { + let shift = 1usize.wrapping_shl(n - 1); + shift | (shift - 1) +} + +/// Unpacks a value using a mask & shift. +pub(crate) const fn unpack(src: usize, mask: usize, shift: u32) -> usize { + (src & mask) >> shift +} diff --git a/third_party/rust/tokio/src/util/error.rs b/third_party/rust/tokio/src/util/error.rs new file mode 100644 index 0000000000..8f252c0c91 --- /dev/null +++ b/third_party/rust/tokio/src/util/error.rs @@ -0,0 +1,17 @@ +/// Error string explaining that the Tokio context hasn't been instantiated. +pub(crate) const CONTEXT_MISSING_ERROR: &str = + "there is no reactor running, must be called from the context of a Tokio 1.x runtime"; + +// some combinations of features might not use this +#[allow(dead_code)] +/// Error string explaining that the Tokio context is shutting down and cannot drive timers. +pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = + "A Tokio 1.x context was found, but it is being shutdown."; + +// some combinations of features might not use this +#[allow(dead_code)] +/// Error string explaining that the Tokio context is not available because the +/// thread-local storing it has been destroyed. This usually only happens during +/// destructors of other thread-locals. +pub(crate) const THREAD_LOCAL_DESTROYED_ERROR: &str = + "The Tokio context thread-local variable has been destroyed."; diff --git a/third_party/rust/tokio/src/util/idle_notified_set.rs b/third_party/rust/tokio/src/util/idle_notified_set.rs new file mode 100644 index 0000000000..71f3a32a85 --- /dev/null +++ b/third_party/rust/tokio/src/util/idle_notified_set.rs @@ -0,0 +1,463 @@ +//! This module defines an `IdleNotifiedSet`, which is a collection of elements. +//! Each element is intended to correspond to a task, and the collection will +//! keep track of which tasks have had their waker notified, and which have not. +//! +//! Each entry in the set holds some user-specified value. The value's type is +//! specified using the `T` parameter. It will usually be a `JoinHandle` or +//! similar. + +use std::marker::PhantomPinned; +use std::mem::ManuallyDrop; +use std::ptr::NonNull; +use std::task::{Context, Waker}; + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::{Arc, Mutex}; +use crate::util::linked_list::{self, Link}; +use crate::util::{waker_ref, Wake}; + +type LinkedList<T> = + linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>; + +/// This is the main handle to the collection. +pub(crate) struct IdleNotifiedSet<T> { + lists: Arc<Lists<T>>, + length: usize, +} + +/// A handle to an entry that is guaranteed to be stored in the idle or notified +/// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet` +/// mutably to prevent the entry from being moved to the `Neither` list, which +/// only the `IdleNotifiedSet` may do. +/// +/// The main consequence of being stored in one of the lists is that the `value` +/// field has not yet been consumed. +/// +/// Note: This entry can be moved from the idle to the notified list while this +/// object exists by waking its waker. +pub(crate) struct EntryInOneOfTheLists<'a, T> { + entry: Arc<ListEntry<T>>, + set: &'a mut IdleNotifiedSet<T>, +} + +type Lists<T> = Mutex<ListsInner<T>>; + +/// The linked lists hold strong references to the ListEntry items, and the +/// ListEntry items also hold a strong reference back to the Lists object, but +/// the destructor of the `IdleNotifiedSet` will clear the two lists, so once +/// that object is destroyed, no ref-cycles will remain. +struct ListsInner<T> { + notified: LinkedList<T>, + idle: LinkedList<T>, + /// Whenever an element in the `notified` list is woken, this waker will be + /// notified and consumed, if it exists. + waker: Option<Waker>, +} + +/// Which of the two lists in the shared Lists object is this entry stored in? +/// +/// If the value is `Idle`, then an entry's waker may move it to the notified +/// list. Otherwise, only the `IdleNotifiedSet` may move it. +/// +/// If the value is `Neither`, then it is still possible that the entry is in +/// some third external list (this happens in `drain`). +#[derive(Copy, Clone, Eq, PartialEq)] +enum List { + Notified, + Idle, + Neither, +} + +/// An entry in the list. +/// +/// # Safety +/// +/// The `my_list` field must only be accessed while holding the mutex in +/// `parent`. It is an invariant that the value of `my_list` corresponds to +/// which linked list in the `parent` holds this entry. Once this field takes +/// the value `Neither`, then it may never be modified again. +/// +/// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field +/// must only be accessed while holding the mutex. If the value of `my_list` is +/// `Neither`, then the `pointers` field may be accessed by the +/// `IdleNotifiedSet` (this happens inside `drain`). +/// +/// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed +/// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to +/// `Neither` assumes ownership of the `value`, and it must either drop it or +/// move it out from this entry to prevent it from getting leaked. (Since the +/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the +/// value should not be leaked.) +/// +/// This type is `#[repr(C)]` because its `linked_list::Link` implementation +/// requires that `pointers` is the first field. +#[repr(C)] +struct ListEntry<T> { + /// The linked list pointers of the list this entry is in. + pointers: linked_list::Pointers<ListEntry<T>>, + /// Pointer to the shared `Lists` struct. + parent: Arc<Lists<T>>, + /// The value stored in this entry. + value: UnsafeCell<ManuallyDrop<T>>, + /// Used to remember which list this entry is in. + my_list: UnsafeCell<List>, + /// Required by the `linked_list::Pointers` field. + _pin: PhantomPinned, +} + +// With mutable access to the `IdleNotifiedSet`, you can get mutable access to +// the values. +unsafe impl<T: Send> Send for IdleNotifiedSet<T> {} +// With the current API we strictly speaking don't even need `T: Sync`, but we +// require it anyway to support adding &self APIs that access the values in the +// future. +unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {} + +// These impls control when it is safe to create a Waker. Since the waker does +// not allow access to the value in any way (including its destructor), it is +// not necessary for `T` to be Send or Sync. +unsafe impl<T> Send for ListEntry<T> {} +unsafe impl<T> Sync for ListEntry<T> {} + +impl<T> IdleNotifiedSet<T> { + /// Create a new IdleNotifiedSet. + pub(crate) fn new() -> Self { + let lists = Mutex::new(ListsInner { + notified: LinkedList::new(), + idle: LinkedList::new(), + waker: None, + }); + + IdleNotifiedSet { + lists: Arc::new(lists), + length: 0, + } + } + + pub(crate) fn len(&self) -> usize { + self.length + } + + pub(crate) fn is_empty(&self) -> bool { + self.length == 0 + } + + /// Insert the given value into the `idle` list. + pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> { + self.length += 1; + + let entry = Arc::new(ListEntry { + parent: self.lists.clone(), + value: UnsafeCell::new(ManuallyDrop::new(value)), + my_list: UnsafeCell::new(List::Idle), + pointers: linked_list::Pointers::new(), + _pin: PhantomPinned, + }); + + { + let mut lock = self.lists.lock(); + lock.idle.push_front(entry.clone()); + } + + // Safety: We just put the entry in the idle list, so it is in one of the lists. + EntryInOneOfTheLists { entry, set: self } + } + + /// Pop an entry from the notified list to poll it. The entry is moved to + /// the idle list atomically. + pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> { + // We don't decrement the length because this call moves the entry to + // the idle list rather than removing it. + if self.length == 0 { + // Fast path. + return None; + } + + let mut lock = self.lists.lock(); + + let should_update_waker = match lock.waker.as_mut() { + Some(cur_waker) => !waker.will_wake(cur_waker), + None => true, + }; + if should_update_waker { + lock.waker = Some(waker.clone()); + } + + // Pop the entry, returning None if empty. + let entry = lock.notified.pop_back()?; + + lock.idle.push_front(entry.clone()); + + // Safety: We are holding the lock. + entry.my_list.with_mut(|ptr| unsafe { + *ptr = List::Idle; + }); + + drop(lock); + + // Safety: We just put the entry in the idle list, so it is in one of the lists. + Some(EntryInOneOfTheLists { entry, set: self }) + } + + /// Call a function on every element in this list. + pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) { + fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) { + let mut node = list.last(); + + while let Some(entry) = node { + ptrs.push(entry.value.with_mut(|ptr| { + let ptr: *mut ManuallyDrop<T> = ptr; + let ptr: *mut T = ptr.cast(); + ptr + })); + + let prev = entry.pointers.get_prev(); + node = prev.map(|prev| unsafe { &*prev.as_ptr() }); + } + } + + // Atomically get a raw pointer to the value of every entry. + // + // Since this only locks the mutex once, it is not possible for a value + // to get moved from the idle list to the notified list during the + // operation, which would otherwise result in some value being listed + // twice. + let mut ptrs = Vec::with_capacity(self.len()); + { + let mut lock = self.lists.lock(); + + get_ptrs(&mut lock.idle, &mut ptrs); + get_ptrs(&mut lock.notified, &mut ptrs); + } + debug_assert_eq!(ptrs.len(), ptrs.capacity()); + + for ptr in ptrs { + // Safety: When we grabbed the pointers, the entries were in one of + // the two lists. This means that their value was valid at the time, + // and it must still be valid because we are the IdleNotifiedSet, + // and only we can remove an entry from the two lists. (It's + // possible that an entry is moved from one list to the other during + // this loop, but that is ok.) + func(unsafe { &mut *ptr }); + } + } + + /// Remove all entries in both lists, applying some function to each element. + /// + /// The closure is called on all elements even if it panics. Having it panic + /// twice is a double-panic, and will abort the application. + pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) { + if self.length == 0 { + // Fast path. + return; + } + self.length = 0; + + // The LinkedList is not cleared on panic, so we use a bomb to clear it. + // + // This value has the invariant that any entry in its `all_entries` list + // has `my_list` set to `Neither` and that the value has not yet been + // dropped. + struct AllEntries<T, F: FnMut(T)> { + all_entries: LinkedList<T>, + func: F, + } + + impl<T, F: FnMut(T)> AllEntries<T, F> { + fn pop_next(&mut self) -> bool { + if let Some(entry) = self.all_entries.pop_back() { + // Safety: We just took this value from the list, so we can + // destroy the value in the entry. + entry + .value + .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) }); + true + } else { + false + } + } + } + + impl<T, F: FnMut(T)> Drop for AllEntries<T, F> { + fn drop(&mut self) { + while self.pop_next() {} + } + } + + let mut all_entries = AllEntries { + all_entries: LinkedList::new(), + func, + }; + + // Atomically move all entries to the new linked list in the AllEntries + // object. + { + let mut lock = self.lists.lock(); + unsafe { + // Safety: We are holding the lock and `all_entries` is a new + // LinkedList. + move_to_new_list(&mut lock.idle, &mut all_entries.all_entries); + move_to_new_list(&mut lock.notified, &mut all_entries.all_entries); + } + } + + // Keep destroying entries in the list until it is empty. + // + // If the closure panics, then the destructor of the `AllEntries` bomb + // ensures that we keep running the destructor on the remaining values. + // A second panic will abort the program. + while all_entries.pop_next() {} + } +} + +/// # Safety +/// +/// The mutex for the entries must be held, and the target list must be such +/// that setting `my_list` to `Neither` is ok. +unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) { + while let Some(entry) = from.pop_back() { + entry.my_list.with_mut(|ptr| { + *ptr = List::Neither; + }); + to.push_front(entry); + } +} + +impl<'a, T> EntryInOneOfTheLists<'a, T> { + /// Remove this entry from the list it is in, returning the value associated + /// with the entry. + /// + /// This consumes the value, since it is no longer guaranteed to be in a + /// list. + pub(crate) fn remove(self) -> T { + self.set.length -= 1; + + { + let mut lock = self.set.lists.lock(); + + // Safety: We are holding the lock so there is no race, and we will + // remove the entry afterwards to uphold invariants. + let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe { + let old_my_list = *ptr; + *ptr = List::Neither; + old_my_list + }); + + let list = match old_my_list { + List::Idle => &mut lock.idle, + List::Notified => &mut lock.notified, + // An entry in one of the lists is in one of the lists. + List::Neither => unreachable!(), + }; + + unsafe { + // Safety: We just checked that the entry is in this particular + // list. + list.remove(ListEntry::as_raw(&self.entry)).unwrap(); + } + } + + // By setting `my_list` to `Neither`, we have taken ownership of the + // value. We return it to the caller. + // + // Safety: We have a mutable reference to the `IdleNotifiedSet` that + // owns this entry, so we can use its permission to access the value. + self.entry + .value + .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) }) + } + + /// Access the value in this entry together with a context for its waker. + pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U + where + F: FnOnce(&mut T, &mut Context<'_>) -> U, + T: 'static, + { + let waker = waker_ref(&self.entry); + + let mut context = Context::from_waker(&waker); + + // Safety: We have a mutable reference to the `IdleNotifiedSet` that + // owns this entry, so we can use its permission to access the value. + self.entry + .value + .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) }) + } +} + +impl<T> Drop for IdleNotifiedSet<T> { + fn drop(&mut self) { + // Clear both lists. + self.drain(drop); + + #[cfg(debug_assertions)] + if !std::thread::panicking() { + let lock = self.lists.lock(); + assert!(lock.idle.is_empty()); + assert!(lock.notified.is_empty()); + } + } +} + +impl<T: 'static> Wake for ListEntry<T> { + fn wake_by_ref(me: &Arc<Self>) { + let mut lock = me.parent.lock(); + + // Safety: We are holding the lock and we will update the lists to + // maintain invariants. + let old_my_list = me.my_list.with_mut(|ptr| unsafe { + let old_my_list = *ptr; + if old_my_list == List::Idle { + *ptr = List::Notified; + } + old_my_list + }); + + if old_my_list == List::Idle { + // We move ourself to the notified list. + let me = unsafe { + // Safety: We just checked that we are in this particular list. + lock.idle.remove(NonNull::from(&**me)).unwrap() + }; + lock.notified.push_front(me); + + if let Some(waker) = lock.waker.take() { + drop(lock); + waker.wake(); + } + } + } + + fn wake(me: Arc<Self>) { + Self::wake_by_ref(&me) + } +} + +/// # Safety +/// +/// `ListEntry` is forced to be !Unpin. +unsafe impl<T> linked_list::Link for ListEntry<T> { + type Handle = Arc<ListEntry<T>>; + type Target = ListEntry<T>; + + fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> { + let ptr: *const ListEntry<T> = Arc::as_ptr(handle); + // Safety: We can't get a null pointer from `Arc::as_ptr`. + unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) } + } + + unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> { + Arc::from_raw(ptr.as_ptr()) + } + + unsafe fn pointers( + target: NonNull<ListEntry<T>>, + ) -> NonNull<linked_list::Pointers<ListEntry<T>>> { + // Safety: The pointers struct is the first field and ListEntry is + // `#[repr(C)]` so this cast is safe. + // + // We do this rather than doing a field access since `std::ptr::addr_of` + // is too new for our MSRV. + target.cast() + } +} diff --git a/third_party/rust/tokio/src/util/linked_list.rs b/third_party/rust/tokio/src/util/linked_list.rs new file mode 100644 index 0000000000..e6bdde68c7 --- /dev/null +++ b/third_party/rust/tokio/src/util/linked_list.rs @@ -0,0 +1,693 @@ +#![cfg_attr(not(feature = "full"), allow(dead_code))] + +//! An intrusive double linked list of data. +//! +//! The data structure supports tracking pinned nodes. Most of the data +//! structure's APIs are `unsafe` as they require the caller to ensure the +//! specified node is actually contained by the list. + +use core::cell::UnsafeCell; +use core::fmt; +use core::marker::{PhantomData, PhantomPinned}; +use core::mem::ManuallyDrop; +use core::ptr::{self, NonNull}; + +/// An intrusive linked list. +/// +/// Currently, the list is not emptied on drop. It is the caller's +/// responsibility to ensure the list is empty before dropping it. +pub(crate) struct LinkedList<L, T> { + /// Linked list head + head: Option<NonNull<T>>, + + /// Linked list tail + tail: Option<NonNull<T>>, + + /// Node type marker. + _marker: PhantomData<*const L>, +} + +unsafe impl<L: Link> Send for LinkedList<L, L::Target> where L::Target: Send {} +unsafe impl<L: Link> Sync for LinkedList<L, L::Target> where L::Target: Sync {} + +/// Defines how a type is tracked within a linked list. +/// +/// In order to support storing a single type within multiple lists, accessing +/// the list pointers is decoupled from the entry type. +/// +/// # Safety +/// +/// Implementations must guarantee that `Target` types are pinned in memory. In +/// other words, when a node is inserted, the value will not be moved as long as +/// it is stored in the list. +pub(crate) unsafe trait Link { + /// Handle to the list entry. + /// + /// This is usually a pointer-ish type. + type Handle; + + /// Node type. + type Target; + + /// Convert the handle to a raw pointer without consuming the handle. + #[allow(clippy::wrong_self_convention)] + fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target>; + + /// Convert the raw pointer to a handle + unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle; + + /// Return the pointers for a node + /// + /// # Safety + /// + /// The resulting pointer should have the same tag in the stacked-borrows + /// stack as the argument. In particular, the method may not create an + /// intermediate reference in the process of creating the resulting raw + /// pointer. + unsafe fn pointers(target: NonNull<Self::Target>) -> NonNull<Pointers<Self::Target>>; +} + +/// Previous / next pointers. +pub(crate) struct Pointers<T> { + inner: UnsafeCell<PointersInner<T>>, +} +/// We do not want the compiler to put the `noalias` attribute on mutable +/// references to this type, so the type has been made `!Unpin` with a +/// `PhantomPinned` field. +/// +/// Additionally, we never access the `prev` or `next` fields directly, as any +/// such access would implicitly involve the creation of a reference to the +/// field, which we want to avoid since the fields are not `!Unpin`, and would +/// hence be given the `noalias` attribute if we were to do such an access. +/// As an alternative to accessing the fields directly, the `Pointers` type +/// provides getters and setters for the two fields, and those are implemented +/// using raw pointer casts and offsets, which is valid since the struct is +/// #[repr(C)]. +/// +/// See this link for more information: +/// <https://github.com/rust-lang/rust/pull/82834> +#[repr(C)] +struct PointersInner<T> { + /// The previous node in the list. null if there is no previous node. + /// + /// This field is accessed through pointer manipulation, so it is not dead code. + #[allow(dead_code)] + prev: Option<NonNull<T>>, + + /// The next node in the list. null if there is no previous node. + /// + /// This field is accessed through pointer manipulation, so it is not dead code. + #[allow(dead_code)] + next: Option<NonNull<T>>, + + /// This type is !Unpin due to the heuristic from: + /// <https://github.com/rust-lang/rust/pull/82834> + _pin: PhantomPinned, +} + +unsafe impl<T: Send> Send for Pointers<T> {} +unsafe impl<T: Sync> Sync for Pointers<T> {} + +// ===== impl LinkedList ===== + +impl<L, T> LinkedList<L, T> { + /// Creates an empty linked list. + pub(crate) const fn new() -> LinkedList<L, T> { + LinkedList { + head: None, + tail: None, + _marker: PhantomData, + } + } +} + +impl<L: Link> LinkedList<L, L::Target> { + /// Adds an element first in the list. + pub(crate) fn push_front(&mut self, val: L::Handle) { + // The value should not be dropped, it is being inserted into the list + let val = ManuallyDrop::new(val); + let ptr = L::as_raw(&*val); + assert_ne!(self.head, Some(ptr)); + unsafe { + L::pointers(ptr).as_mut().set_next(self.head); + L::pointers(ptr).as_mut().set_prev(None); + + if let Some(head) = self.head { + L::pointers(head).as_mut().set_prev(Some(ptr)); + } + + self.head = Some(ptr); + + if self.tail.is_none() { + self.tail = Some(ptr); + } + } + } + + /// Removes the last element from a list and returns it, or None if it is + /// empty. + pub(crate) fn pop_back(&mut self) -> Option<L::Handle> { + unsafe { + let last = self.tail?; + self.tail = L::pointers(last).as_ref().get_prev(); + + if let Some(prev) = L::pointers(last).as_ref().get_prev() { + L::pointers(prev).as_mut().set_next(None); + } else { + self.head = None + } + + L::pointers(last).as_mut().set_prev(None); + L::pointers(last).as_mut().set_next(None); + + Some(L::from_raw(last)) + } + } + + /// Returns whether the linked list does not contain any node + pub(crate) fn is_empty(&self) -> bool { + if self.head.is_some() { + return false; + } + + assert!(self.tail.is_none()); + true + } + + /// Removes the specified node from the list + /// + /// # Safety + /// + /// The caller **must** ensure that `node` is currently contained by + /// `self` or not contained by any other list. + pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> { + if let Some(prev) = L::pointers(node).as_ref().get_prev() { + debug_assert_eq!(L::pointers(prev).as_ref().get_next(), Some(node)); + L::pointers(prev) + .as_mut() + .set_next(L::pointers(node).as_ref().get_next()); + } else { + if self.head != Some(node) { + return None; + } + + self.head = L::pointers(node).as_ref().get_next(); + } + + if let Some(next) = L::pointers(node).as_ref().get_next() { + debug_assert_eq!(L::pointers(next).as_ref().get_prev(), Some(node)); + L::pointers(next) + .as_mut() + .set_prev(L::pointers(node).as_ref().get_prev()); + } else { + // This might be the last item in the list + if self.tail != Some(node) { + return None; + } + + self.tail = L::pointers(node).as_ref().get_prev(); + } + + L::pointers(node).as_mut().set_next(None); + L::pointers(node).as_mut().set_prev(None); + + Some(L::from_raw(node)) + } +} + +impl<L: Link> fmt::Debug for LinkedList<L, L::Target> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LinkedList") + .field("head", &self.head) + .field("tail", &self.tail) + .finish() + } +} + +#[cfg(any( + feature = "fs", + feature = "rt", + all(unix, feature = "process"), + feature = "signal", + feature = "sync", +))] +impl<L: Link> LinkedList<L, L::Target> { + pub(crate) fn last(&self) -> Option<&L::Target> { + let tail = self.tail.as_ref()?; + unsafe { Some(&*tail.as_ptr()) } + } +} + +impl<L: Link> Default for LinkedList<L, L::Target> { + fn default() -> Self { + Self::new() + } +} + +// ===== impl DrainFilter ===== + +cfg_io_readiness! { + pub(crate) struct DrainFilter<'a, T: Link, F> { + list: &'a mut LinkedList<T, T::Target>, + filter: F, + curr: Option<NonNull<T::Target>>, + } + + impl<T: Link> LinkedList<T, T::Target> { + pub(crate) fn drain_filter<F>(&mut self, filter: F) -> DrainFilter<'_, T, F> + where + F: FnMut(&mut T::Target) -> bool, + { + let curr = self.head; + DrainFilter { + curr, + filter, + list: self, + } + } + } + + impl<'a, T, F> Iterator for DrainFilter<'a, T, F> + where + T: Link, + F: FnMut(&mut T::Target) -> bool, + { + type Item = T::Handle; + + fn next(&mut self) -> Option<Self::Item> { + while let Some(curr) = self.curr { + // safety: the pointer references data contained by the list + self.curr = unsafe { T::pointers(curr).as_ref() }.get_next(); + + // safety: the value is still owned by the linked list. + if (self.filter)(unsafe { &mut *curr.as_ptr() }) { + return unsafe { self.list.remove(curr) }; + } + } + + None + } + } +} + +// ===== impl Pointers ===== + +impl<T> Pointers<T> { + /// Create a new set of empty pointers + pub(crate) fn new() -> Pointers<T> { + Pointers { + inner: UnsafeCell::new(PointersInner { + prev: None, + next: None, + _pin: PhantomPinned, + }), + } + } + + pub(crate) fn get_prev(&self) -> Option<NonNull<T>> { + // SAFETY: prev is the first field in PointersInner, which is #[repr(C)]. + unsafe { + let inner = self.inner.get(); + let prev = inner as *const Option<NonNull<T>>; + ptr::read(prev) + } + } + pub(crate) fn get_next(&self) -> Option<NonNull<T>> { + // SAFETY: next is the second field in PointersInner, which is #[repr(C)]. + unsafe { + let inner = self.inner.get(); + let prev = inner as *const Option<NonNull<T>>; + let next = prev.add(1); + ptr::read(next) + } + } + + fn set_prev(&mut self, value: Option<NonNull<T>>) { + // SAFETY: prev is the first field in PointersInner, which is #[repr(C)]. + unsafe { + let inner = self.inner.get(); + let prev = inner as *mut Option<NonNull<T>>; + ptr::write(prev, value); + } + } + fn set_next(&mut self, value: Option<NonNull<T>>) { + // SAFETY: next is the second field in PointersInner, which is #[repr(C)]. + unsafe { + let inner = self.inner.get(); + let prev = inner as *mut Option<NonNull<T>>; + let next = prev.add(1); + ptr::write(next, value); + } + } +} + +impl<T> fmt::Debug for Pointers<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let prev = self.get_prev(); + let next = self.get_next(); + f.debug_struct("Pointers") + .field("prev", &prev) + .field("next", &next) + .finish() + } +} + +#[cfg(test)] +#[cfg(not(loom))] +mod tests { + use super::*; + + use std::pin::Pin; + + #[derive(Debug)] + #[repr(C)] + struct Entry { + pointers: Pointers<Entry>, + val: i32, + } + + unsafe impl<'a> Link for &'a Entry { + type Handle = Pin<&'a Entry>; + type Target = Entry; + + fn as_raw(handle: &Pin<&'_ Entry>) -> NonNull<Entry> { + NonNull::from(handle.get_ref()) + } + + unsafe fn from_raw(ptr: NonNull<Entry>) -> Pin<&'a Entry> { + Pin::new_unchecked(&*ptr.as_ptr()) + } + + unsafe fn pointers(target: NonNull<Entry>) -> NonNull<Pointers<Entry>> { + target.cast() + } + } + + fn entry(val: i32) -> Pin<Box<Entry>> { + Box::pin(Entry { + pointers: Pointers::new(), + val, + }) + } + + fn ptr(r: &Pin<Box<Entry>>) -> NonNull<Entry> { + r.as_ref().get_ref().into() + } + + fn collect_list(list: &mut LinkedList<&'_ Entry, <&'_ Entry as Link>::Target>) -> Vec<i32> { + let mut ret = vec![]; + + while let Some(entry) = list.pop_back() { + ret.push(entry.val); + } + + ret + } + + fn push_all<'a>( + list: &mut LinkedList<&'a Entry, <&'_ Entry as Link>::Target>, + entries: &[Pin<&'a Entry>], + ) { + for entry in entries.iter() { + list.push_front(*entry); + } + } + + macro_rules! assert_clean { + ($e:ident) => {{ + assert!($e.pointers.get_next().is_none()); + assert!($e.pointers.get_prev().is_none()); + }}; + } + + macro_rules! assert_ptr_eq { + ($a:expr, $b:expr) => {{ + // Deal with mapping a Pin<&mut T> -> Option<NonNull<T>> + assert_eq!(Some($a.as_ref().get_ref().into()), $b) + }}; + } + + #[test] + fn const_new() { + const _: LinkedList<&Entry, <&Entry as Link>::Target> = LinkedList::new(); + } + + #[test] + fn push_and_drain() { + let a = entry(5); + let b = entry(7); + let c = entry(31); + + let mut list = LinkedList::new(); + assert!(list.is_empty()); + + list.push_front(a.as_ref()); + assert!(!list.is_empty()); + list.push_front(b.as_ref()); + list.push_front(c.as_ref()); + + let items: Vec<i32> = collect_list(&mut list); + assert_eq!([5, 7, 31].to_vec(), items); + + assert!(list.is_empty()); + } + + #[test] + fn push_pop_push_pop() { + let a = entry(5); + let b = entry(7); + + let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); + + list.push_front(a.as_ref()); + + let entry = list.pop_back().unwrap(); + assert_eq!(5, entry.val); + assert!(list.is_empty()); + + list.push_front(b.as_ref()); + + let entry = list.pop_back().unwrap(); + assert_eq!(7, entry.val); + + assert!(list.is_empty()); + assert!(list.pop_back().is_none()); + } + + #[test] + fn remove_by_address() { + let a = entry(5); + let b = entry(7); + let c = entry(31); + + unsafe { + // Remove first + let mut list = LinkedList::new(); + + push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]); + assert!(list.remove(ptr(&a)).is_some()); + assert_clean!(a); + // `a` should be no longer there and can't be removed twice + assert!(list.remove(ptr(&a)).is_none()); + assert!(!list.is_empty()); + + assert!(list.remove(ptr(&b)).is_some()); + assert_clean!(b); + // `b` should be no longer there and can't be removed twice + assert!(list.remove(ptr(&b)).is_none()); + assert!(!list.is_empty()); + + assert!(list.remove(ptr(&c)).is_some()); + assert_clean!(c); + // `b` should be no longer there and can't be removed twice + assert!(list.remove(ptr(&c)).is_none()); + assert!(list.is_empty()); + } + + unsafe { + // Remove middle + let mut list = LinkedList::new(); + + push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]); + + assert!(list.remove(ptr(&a)).is_some()); + assert_clean!(a); + + assert_ptr_eq!(b, list.head); + assert_ptr_eq!(c, b.pointers.get_next()); + assert_ptr_eq!(b, c.pointers.get_prev()); + + let items = collect_list(&mut list); + assert_eq!([31, 7].to_vec(), items); + } + + unsafe { + // Remove middle + let mut list = LinkedList::new(); + + push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]); + + assert!(list.remove(ptr(&b)).is_some()); + assert_clean!(b); + + assert_ptr_eq!(c, a.pointers.get_next()); + assert_ptr_eq!(a, c.pointers.get_prev()); + + let items = collect_list(&mut list); + assert_eq!([31, 5].to_vec(), items); + } + + unsafe { + // Remove last + // Remove middle + let mut list = LinkedList::new(); + + push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]); + + assert!(list.remove(ptr(&c)).is_some()); + assert_clean!(c); + + assert!(b.pointers.get_next().is_none()); + assert_ptr_eq!(b, list.tail); + + let items = collect_list(&mut list); + assert_eq!([7, 5].to_vec(), items); + } + + unsafe { + // Remove first of two + let mut list = LinkedList::new(); + + push_all(&mut list, &[b.as_ref(), a.as_ref()]); + + assert!(list.remove(ptr(&a)).is_some()); + + assert_clean!(a); + + // a should be no longer there and can't be removed twice + assert!(list.remove(ptr(&a)).is_none()); + + assert_ptr_eq!(b, list.head); + assert_ptr_eq!(b, list.tail); + + assert!(b.pointers.get_next().is_none()); + assert!(b.pointers.get_prev().is_none()); + + let items = collect_list(&mut list); + assert_eq!([7].to_vec(), items); + } + + unsafe { + // Remove last of two + let mut list = LinkedList::new(); + + push_all(&mut list, &[b.as_ref(), a.as_ref()]); + + assert!(list.remove(ptr(&b)).is_some()); + + assert_clean!(b); + + assert_ptr_eq!(a, list.head); + assert_ptr_eq!(a, list.tail); + + assert!(a.pointers.get_next().is_none()); + assert!(a.pointers.get_prev().is_none()); + + let items = collect_list(&mut list); + assert_eq!([5].to_vec(), items); + } + + unsafe { + // Remove last item + let mut list = LinkedList::new(); + + push_all(&mut list, &[a.as_ref()]); + + assert!(list.remove(ptr(&a)).is_some()); + assert_clean!(a); + + assert!(list.head.is_none()); + assert!(list.tail.is_none()); + let items = collect_list(&mut list); + assert!(items.is_empty()); + } + + unsafe { + // Remove missing + let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); + + list.push_front(b.as_ref()); + list.push_front(a.as_ref()); + + assert!(list.remove(ptr(&c)).is_none()); + } + } + + #[cfg(not(target_arch = "wasm32"))] + proptest::proptest! { + #[test] + fn fuzz_linked_list(ops: Vec<usize>) { + run_fuzz(ops); + } + } + + fn run_fuzz(ops: Vec<usize>) { + use std::collections::VecDeque; + + #[derive(Debug)] + enum Op { + Push, + Pop, + Remove(usize), + } + + let ops = ops + .iter() + .map(|i| match i % 3 { + 0 => Op::Push, + 1 => Op::Pop, + 2 => Op::Remove(i / 3), + _ => unreachable!(), + }) + .collect::<Vec<_>>(); + + let mut ll = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); + let mut reference = VecDeque::new(); + + let entries: Vec<_> = (0..ops.len()).map(|i| entry(i as i32)).collect(); + + for (i, op) in ops.iter().enumerate() { + match op { + Op::Push => { + reference.push_front(i as i32); + assert_eq!(entries[i].val, i as i32); + + ll.push_front(entries[i].as_ref()); + } + Op::Pop => { + if reference.is_empty() { + assert!(ll.is_empty()); + continue; + } + + let v = reference.pop_back(); + assert_eq!(v, ll.pop_back().map(|v| v.val)); + } + Op::Remove(n) => { + if reference.is_empty() { + assert!(ll.is_empty()); + continue; + } + + let idx = n % reference.len(); + let expect = reference.remove(idx).unwrap(); + + unsafe { + let entry = ll.remove(ptr(&entries[expect as usize])).unwrap(); + assert_eq!(expect, entry.val); + } + } + } + } + } +} diff --git a/third_party/rust/tokio/src/util/mod.rs b/third_party/rust/tokio/src/util/mod.rs new file mode 100644 index 0000000000..618f554380 --- /dev/null +++ b/third_party/rust/tokio/src/util/mod.rs @@ -0,0 +1,83 @@ +cfg_io_driver! { + pub(crate) mod bit; + pub(crate) mod slab; +} + +#[cfg(feature = "rt")] +pub(crate) mod atomic_cell; + +#[cfg(any( + // io driver uses `WakeList` directly + feature = "net", + feature = "process", + // `sync` enables `Notify` and `batch_semaphore`, which require `WakeList`. + feature = "sync", + // `fs` uses `batch_semaphore`, which requires `WakeList`. + feature = "fs", + // rt and signal use `Notify`, which requires `WakeList`. + feature = "rt", + feature = "signal", +))] +mod wake_list; +#[cfg(any( + feature = "net", + feature = "process", + feature = "sync", + feature = "fs", + feature = "rt", + feature = "signal", +))] +pub(crate) use wake_list::WakeList; + +#[cfg(any( + feature = "fs", + feature = "net", + feature = "process", + feature = "rt", + feature = "sync", + feature = "signal", + feature = "time", +))] +pub(crate) mod linked_list; + +#[cfg(any(feature = "rt-multi-thread", feature = "macros"))] +mod rand; + +cfg_rt! { + cfg_unstable! { + mod idle_notified_set; + pub(crate) use idle_notified_set::IdleNotifiedSet; + } + + mod wake; + pub(crate) use wake::WakerRef; + pub(crate) use wake::{waker_ref, Wake}; + + mod sync_wrapper; + pub(crate) use sync_wrapper::SyncWrapper; + + mod vec_deque_cell; + pub(crate) use vec_deque_cell::VecDequeCell; +} + +cfg_rt_multi_thread! { + pub(crate) use self::rand::FastRand; + + mod try_lock; + pub(crate) use try_lock::TryLock; +} + +pub(crate) mod trace; + +#[cfg(any(feature = "macros"))] +#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] +pub use self::rand::thread_rng_n; + +#[cfg(any( + feature = "rt", + feature = "time", + feature = "net", + feature = "process", + all(unix, feature = "signal") +))] +pub(crate) mod error; diff --git a/third_party/rust/tokio/src/util/pad.rs b/third_party/rust/tokio/src/util/pad.rs new file mode 100644 index 0000000000..bf0913ca85 --- /dev/null +++ b/third_party/rust/tokio/src/util/pad.rs @@ -0,0 +1,52 @@ +use core::fmt; +use core::ops::{Deref, DerefMut}; + +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +#[cfg_attr(target_arch = "x86_64", repr(align(128)))] +#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] +pub(crate) struct CachePadded<T> { + value: T, +} + +unsafe impl<T: Send> Send for CachePadded<T> {} +unsafe impl<T: Sync> Sync for CachePadded<T> {} + +impl<T> CachePadded<T> { + pub(crate) fn new(t: T) -> CachePadded<T> { + CachePadded::<T> { value: t } + } +} + +impl<T> Deref for CachePadded<T> { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl<T> DerefMut for CachePadded<T> { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + +impl<T: fmt::Debug> fmt::Debug for CachePadded<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CachePadded") + .field("value", &self.value) + .finish() + } +} + +impl<T> From<T> for CachePadded<T> { + fn from(t: T) -> Self { + CachePadded::new(t) + } +} diff --git a/third_party/rust/tokio/src/util/rand.rs b/third_party/rust/tokio/src/util/rand.rs new file mode 100644 index 0000000000..6b19c8be95 --- /dev/null +++ b/third_party/rust/tokio/src/util/rand.rs @@ -0,0 +1,64 @@ +use std::cell::Cell; + +/// Fast random number generate. +/// +/// Implement xorshift64+: 2 32-bit xorshift sequences added together. +/// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's +/// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> +/// This generator passes the SmallCrush suite, part of TestU01 framework: +/// <http://simul.iro.umontreal.ca/testu01/tu01.html> +#[derive(Debug)] +pub(crate) struct FastRand { + one: Cell<u32>, + two: Cell<u32>, +} + +impl FastRand { + /// Initializes a new, thread-local, fast random number generator. + pub(crate) fn new(seed: u64) -> FastRand { + let one = (seed >> 32) as u32; + let mut two = seed as u32; + + if two == 0 { + // This value cannot be zero + two = 1; + } + + FastRand { + one: Cell::new(one), + two: Cell::new(two), + } + } + + pub(crate) fn fastrand_n(&self, n: u32) -> u32 { + // This is similar to fastrand() % n, but faster. + // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + let mul = (self.fastrand() as u64).wrapping_mul(n as u64); + (mul >> 32) as u32 + } + + fn fastrand(&self) -> u32 { + let mut s1 = self.one.get(); + let s0 = self.two.get(); + + s1 ^= s1 << 17; + s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; + + self.one.set(s0); + self.two.set(s1); + + s0.wrapping_add(s1) + } +} + +// Used by the select macro and `StreamMap` +#[cfg(any(feature = "macros"))] +#[doc(hidden)] +#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] +pub fn thread_rng_n(n: u32) -> u32 { + thread_local! { + static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); + } + + THREAD_RNG.with(|rng| rng.fastrand_n(n)) +} diff --git a/third_party/rust/tokio/src/util/slab.rs b/third_party/rust/tokio/src/util/slab.rs new file mode 100644 index 0000000000..214fa08dc8 --- /dev/null +++ b/third_party/rust/tokio/src/util/slab.rs @@ -0,0 +1,855 @@ +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; +use crate::loom::sync::{Arc, Mutex}; +use crate::util::bit; +use std::fmt; +use std::mem; +use std::ops; +use std::ptr; +use std::sync::atomic::Ordering::Relaxed; + +/// Amortized allocation for homogeneous data types. +/// +/// The slab pre-allocates chunks of memory to store values. It uses a similar +/// growing strategy as `Vec`. When new capacity is needed, the slab grows by +/// 2x. +/// +/// # Pages +/// +/// Unlike `Vec`, growing does not require moving existing elements. Instead of +/// being a continuous chunk of memory for all elements, `Slab` is an array of +/// arrays. The top-level array is an array of pages. Each page is 2x bigger +/// than the previous one. When the slab grows, a new page is allocated. +/// +/// Pages are lazily initialized. +/// +/// # Allocating +/// +/// When allocating an object, first previously used slots are reused. If no +/// previously used slot is available, a new slot is initialized in an existing +/// page. If all pages are full, then a new page is allocated. +/// +/// When an allocated object is released, it is pushed into it's page's free +/// list. Allocating scans all pages for a free slot. +/// +/// # Indexing +/// +/// The slab is able to index values using an address. Even when the indexed +/// object has been released, it is still safe to index. This is a key ability +/// for using the slab with the I/O driver. Addresses are registered with the +/// OS's selector and I/O resources can be released without synchronizing with +/// the OS. +/// +/// # Compaction +/// +/// `Slab::compact` will release pages that have been allocated but are no +/// longer used. This is done by scanning the pages and finding pages with no +/// allocated objects. These pages are then freed. +/// +/// # Synchronization +/// +/// The `Slab` structure is able to provide (mostly) unsynchronized reads to +/// values stored in the slab. Insertions and removals are synchronized. Reading +/// objects via `Ref` is fully unsynchronized. Indexing objects uses amortized +/// synchronization. +/// +pub(crate) struct Slab<T> { + /// Array of pages. Each page is synchronized. + pages: [Arc<Page<T>>; NUM_PAGES], + + /// Caches the array pointer & number of initialized slots. + cached: [CachedPage<T>; NUM_PAGES], +} + +/// Allocate values in the associated slab. +pub(crate) struct Allocator<T> { + /// Pages in the slab. The first page has a capacity of 16 elements. Each + /// following page has double the capacity of the previous page. + /// + /// Each returned `Ref` holds a reference count to this `Arc`. + pages: [Arc<Page<T>>; NUM_PAGES], +} + +/// References a slot in the slab. Indexing a slot using an `Address` is memory +/// safe even if the slot has been released or the page has been deallocated. +/// However, it is not guaranteed that the slot has not been reused and is now +/// represents a different value. +/// +/// The I/O driver uses a counter to track the slot's generation. Once accessing +/// the slot, the generations are compared. If they match, the value matches the +/// address. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) struct Address(usize); + +/// An entry in the slab. +pub(crate) trait Entry: Default { + /// Resets the entry's value and track the generation. + fn reset(&self); +} + +/// A reference to a value stored in the slab. +pub(crate) struct Ref<T> { + value: *const Value<T>, +} + +/// Maximum number of pages a slab can contain. +const NUM_PAGES: usize = 19; + +/// Minimum number of slots a page can contain. +const PAGE_INITIAL_SIZE: usize = 32; +const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1; + +/// A page in the slab. +struct Page<T> { + /// Slots. + slots: Mutex<Slots<T>>, + + // Number of slots currently being used. This is not guaranteed to be up to + // date and should only be used as a hint. + used: AtomicUsize, + + // Set to `true` when the page has been allocated. + allocated: AtomicBool, + + // The number of slots the page can hold. + len: usize, + + // Length of all previous pages combined. + prev_len: usize, +} + +struct CachedPage<T> { + /// Pointer to the page's slots. + slots: *const Slot<T>, + + /// Number of initialized slots. + init: usize, +} + +/// Page state. +struct Slots<T> { + /// Slots. + slots: Vec<Slot<T>>, + + head: usize, + + /// Number of slots currently in use. + used: usize, +} + +unsafe impl<T: Sync> Sync for Page<T> {} +unsafe impl<T: Sync> Send for Page<T> {} +unsafe impl<T: Sync> Sync for CachedPage<T> {} +unsafe impl<T: Sync> Send for CachedPage<T> {} +unsafe impl<T: Sync> Sync for Ref<T> {} +unsafe impl<T: Sync> Send for Ref<T> {} + +/// A slot in the slab. Contains slot-specific metadata. +/// +/// `#[repr(C)]` guarantees that the struct starts w/ `value`. We use pointer +/// math to map a value pointer to an index in the page. +#[repr(C)] +struct Slot<T> { + /// Pointed to by `Ref`. + value: UnsafeCell<Value<T>>, + + /// Next entry in the free list. + next: u32, + + /// Makes miri happy by making mutable references not take exclusive access. + /// + /// Could probably also be fixed by replacing `slots` with a raw-pointer + /// based equivalent. + _pin: std::marker::PhantomPinned, +} + +/// Value paired with a reference to the page. +struct Value<T> { + /// Value stored in the value. + value: T, + + /// Pointer to the page containing the slot. + /// + /// A raw pointer is used as this creates a ref cycle. + page: *const Page<T>, +} + +impl<T> Slab<T> { + /// Create a new, empty, slab. + pub(crate) fn new() -> Slab<T> { + // Initializing arrays is a bit annoying. Instead of manually writing + // out an array and every single entry, `Default::default()` is used to + // initialize the array, then the array is iterated and each value is + // initialized. + let mut slab = Slab { + pages: Default::default(), + cached: Default::default(), + }; + + let mut len = PAGE_INITIAL_SIZE; + let mut prev_len: usize = 0; + + for page in &mut slab.pages { + let page = Arc::get_mut(page).unwrap(); + page.len = len; + page.prev_len = prev_len; + len *= 2; + prev_len += page.len; + + // Ensure we don't exceed the max address space. + debug_assert!( + page.len - 1 + page.prev_len < (1 << 24), + "max = {:b}", + page.len - 1 + page.prev_len + ); + } + + slab + } + + /// Returns a new `Allocator`. + /// + /// The `Allocator` supports concurrent allocation of objects. + pub(crate) fn allocator(&self) -> Allocator<T> { + Allocator { + pages: self.pages.clone(), + } + } + + /// Returns a reference to the value stored at the given address. + /// + /// `&mut self` is used as the call may update internal cached state. + pub(crate) fn get(&mut self, addr: Address) -> Option<&T> { + let page_idx = addr.page(); + let slot_idx = self.pages[page_idx].slot(addr); + + // If the address references a slot that was last seen as uninitialized, + // the `CachedPage` is updated. This requires acquiring the page lock + // and updating the slot pointer and initialized offset. + if self.cached[page_idx].init <= slot_idx { + self.cached[page_idx].refresh(&self.pages[page_idx]); + } + + // If the address **still** references an uninitialized slot, then the + // address is invalid and `None` is returned. + if self.cached[page_idx].init <= slot_idx { + return None; + } + + // Get a reference to the value. The lifetime of the returned reference + // is bound to `&self`. The only way to invalidate the underlying memory + // is to call `compact()`. The lifetimes prevent calling `compact()` + // while references to values are outstanding. + // + // The referenced data is never mutated. Only `&self` references are + // used and the data is `Sync`. + Some(self.cached[page_idx].get(slot_idx)) + } + + /// Calls the given function with a reference to each slot in the slab. The + /// slot may not be in-use. + /// + /// This is used by the I/O driver during the shutdown process to notify + /// each pending task. + pub(crate) fn for_each(&mut self, mut f: impl FnMut(&T)) { + for page_idx in 0..self.pages.len() { + // It is required to avoid holding the lock when calling the + // provided function. The function may attempt to acquire the lock + // itself. If we hold the lock here while calling `f`, a deadlock + // situation is possible. + // + // Instead of iterating the slots directly in `page`, which would + // require holding the lock, the cache is updated and the slots are + // iterated from the cache. + self.cached[page_idx].refresh(&self.pages[page_idx]); + + for slot_idx in 0..self.cached[page_idx].init { + f(self.cached[page_idx].get(slot_idx)); + } + } + } + + // Release memory back to the allocator. + // + // If pages are empty, the underlying memory is released back to the + // allocator. + pub(crate) fn compact(&mut self) { + // Iterate each page except the very first one. The very first page is + // never freed. + for (idx, page) in self.pages.iter().enumerate().skip(1) { + if page.used.load(Relaxed) != 0 || !page.allocated.load(Relaxed) { + // If the page has slots in use or the memory has not been + // allocated then it cannot be compacted. + continue; + } + + let mut slots = match page.slots.try_lock() { + Some(slots) => slots, + // If the lock cannot be acquired due to being held by another + // thread, don't try to compact the page. + _ => continue, + }; + + if slots.used > 0 || slots.slots.capacity() == 0 { + // The page is in use or it has not yet been allocated. Either + // way, there is no more work to do. + continue; + } + + page.allocated.store(false, Relaxed); + + // Remove the slots vector from the page. This is done so that the + // freeing process is done outside of the lock's critical section. + let vec = mem::take(&mut slots.slots); + slots.head = 0; + + // Drop the lock so we can drop the vector outside the lock below. + drop(slots); + + debug_assert!( + self.cached[idx].slots.is_null() || self.cached[idx].slots == vec.as_ptr(), + "cached = {:?}; actual = {:?}", + self.cached[idx].slots, + vec.as_ptr(), + ); + + // Clear cache + self.cached[idx].slots = ptr::null(); + self.cached[idx].init = 0; + + drop(vec); + } + } +} + +impl<T> fmt::Debug for Slab<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + debug(fmt, "Slab", &self.pages[..]) + } +} + +impl<T: Entry> Allocator<T> { + /// Allocate a new entry and return a handle to the entry. + /// + /// Scans pages from smallest to biggest, stopping when a slot is found. + /// Pages are allocated if necessary. + /// + /// Returns `None` if the slab is full. + pub(crate) fn allocate(&self) -> Option<(Address, Ref<T>)> { + // Find the first available slot. + for page in &self.pages[..] { + if let Some((addr, val)) = Page::allocate(page) { + return Some((addr, val)); + } + } + + None + } +} + +impl<T> fmt::Debug for Allocator<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + debug(fmt, "slab::Allocator", &self.pages[..]) + } +} + +impl<T> ops::Deref for Ref<T> { + type Target = T; + + fn deref(&self) -> &T { + // Safety: `&mut` is never handed out to the underlying value. The page + // is not freed until all `Ref` values are dropped. + unsafe { &(*self.value).value } + } +} + +impl<T> Drop for Ref<T> { + fn drop(&mut self) { + // Safety: `&mut` is never handed out to the underlying value. The page + // is not freed until all `Ref` values are dropped. + let _ = unsafe { (*self.value).release() }; + } +} + +impl<T: fmt::Debug> fmt::Debug for Ref<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(fmt) + } +} + +impl<T: Entry> Page<T> { + // Allocates an object, returns the ref and address. + // + // `self: &Arc<Page<T>>` is avoided here as this would not work with the + // loom `Arc`. + fn allocate(me: &Arc<Page<T>>) -> Option<(Address, Ref<T>)> { + // Before acquiring the lock, use the `used` hint. + if me.used.load(Relaxed) == me.len { + return None; + } + + // Allocating objects requires synchronization + let mut locked = me.slots.lock(); + + if locked.head < locked.slots.len() { + // Re-use an already initialized slot. + // + // Help out the borrow checker + let locked = &mut *locked; + + // Get the index of the slot at the head of the free stack. This is + // the slot that will be reused. + let idx = locked.head; + let slot = &locked.slots[idx]; + + // Update the free stack head to point to the next slot. + locked.head = slot.next as usize; + + // Increment the number of used slots + locked.used += 1; + me.used.store(locked.used, Relaxed); + + // Reset the slot + slot.value.with(|ptr| unsafe { (*ptr).value.reset() }); + + // Return a reference to the slot + Some((me.addr(idx), locked.gen_ref(idx, me))) + } else if me.len == locked.slots.len() { + // The page is full + None + } else { + // No initialized slots are available, but the page has more + // capacity. Initialize a new slot. + let idx = locked.slots.len(); + + if idx == 0 { + // The page has not yet been allocated. Allocate the storage for + // all page slots. + locked.slots.reserve_exact(me.len); + } + + // Initialize a new slot + locked.slots.push(Slot { + value: UnsafeCell::new(Value { + value: Default::default(), + page: Arc::as_ptr(me), + }), + next: 0, + _pin: std::marker::PhantomPinned, + }); + + // Increment the head to indicate the free stack is empty + locked.head += 1; + + // Increment the number of used slots + locked.used += 1; + me.used.store(locked.used, Relaxed); + me.allocated.store(true, Relaxed); + + debug_assert_eq!(locked.slots.len(), locked.head); + + Some((me.addr(idx), locked.gen_ref(idx, me))) + } + } +} + +impl<T> Page<T> { + /// Returns the slot index within the current page referenced by the given + /// address. + fn slot(&self, addr: Address) -> usize { + addr.0 - self.prev_len + } + + /// Returns the address for the given slot. + fn addr(&self, slot: usize) -> Address { + Address(slot + self.prev_len) + } +} + +impl<T> Default for Page<T> { + fn default() -> Page<T> { + Page { + used: AtomicUsize::new(0), + allocated: AtomicBool::new(false), + slots: Mutex::new(Slots { + slots: Vec::new(), + head: 0, + used: 0, + }), + len: 0, + prev_len: 0, + } + } +} + +impl<T> Page<T> { + /// Release a slot into the page's free list. + fn release(&self, value: *const Value<T>) { + let mut locked = self.slots.lock(); + + let idx = locked.index_for(value); + locked.slots[idx].next = locked.head as u32; + locked.head = idx; + locked.used -= 1; + + self.used.store(locked.used, Relaxed); + } +} + +impl<T> CachedPage<T> { + /// Refreshes the cache. + fn refresh(&mut self, page: &Page<T>) { + let slots = page.slots.lock(); + + if !slots.slots.is_empty() { + self.slots = slots.slots.as_ptr(); + self.init = slots.slots.len(); + } + } + + /// Gets a value by index. + fn get(&self, idx: usize) -> &T { + assert!(idx < self.init); + + // Safety: Pages are allocated concurrently, but are only ever + // **deallocated** by `Slab`. `Slab` will always have a more + // conservative view on the state of the slot array. Once `CachedPage` + // sees a slot pointer and initialized offset, it will remain valid + // until `compact()` is called. The `compact()` function also updates + // `CachedPage`. + unsafe { + let slot = self.slots.add(idx); + let value = slot as *const Value<T>; + + &(*value).value + } + } +} + +impl<T> Default for CachedPage<T> { + fn default() -> CachedPage<T> { + CachedPage { + slots: ptr::null(), + init: 0, + } + } +} + +impl<T> Slots<T> { + /// Maps a slot pointer to an offset within the current page. + /// + /// The pointer math removes the `usize` index from the `Ref` struct, + /// shrinking the struct to a single pointer size. The contents of the + /// function is safe, the resulting `usize` is bounds checked before being + /// used. + /// + /// # Panics + /// + /// panics if the provided slot pointer is not contained by the page. + fn index_for(&self, slot: *const Value<T>) -> usize { + use std::mem; + + let base = &self.slots[0] as *const _ as usize; + + assert!(base != 0, "page is unallocated"); + + let slot = slot as usize; + let width = mem::size_of::<Slot<T>>(); + + assert!(slot >= base, "unexpected pointer"); + + let idx = (slot - base) / width; + assert!(idx < self.slots.len() as usize); + + idx + } + + /// Generates a `Ref` for the slot at the given index. This involves bumping the page's ref count. + fn gen_ref(&self, idx: usize, page: &Arc<Page<T>>) -> Ref<T> { + assert!(idx < self.slots.len()); + mem::forget(page.clone()); + + let vec_ptr = self.slots.as_ptr(); + let slot: *const Slot<T> = unsafe { vec_ptr.add(idx) }; + let value: *const Value<T> = slot as *const Value<T>; + + Ref { value } + } +} + +impl<T> Value<T> { + /// Releases the slot, returning the `Arc<Page<T>>` logically owned by the ref. + fn release(&self) -> Arc<Page<T>> { + // Safety: called by `Ref`, which owns an `Arc<Page<T>>` instance. + let page = unsafe { Arc::from_raw(self.page) }; + page.release(self as *const _); + page + } +} + +impl Address { + fn page(self) -> usize { + // Since every page is twice as large as the previous page, and all page + // sizes are powers of two, we can determine the page index that + // contains a given address by shifting the address down by the smallest + // page size and looking at how many twos places necessary to represent + // that number, telling us what power of two page size it fits inside + // of. We can determine the number of twos places by counting the number + // of leading zeros (unused twos places) in the number's binary + // representation, and subtracting that count from the total number of + // bits in a word. + let slot_shifted = (self.0 + PAGE_INITIAL_SIZE) >> PAGE_INDEX_SHIFT; + (bit::pointer_width() - slot_shifted.leading_zeros()) as usize + } + + pub(crate) const fn as_usize(self) -> usize { + self.0 + } + + pub(crate) fn from_usize(src: usize) -> Address { + Address(src) + } +} + +fn debug<T>(fmt: &mut fmt::Formatter<'_>, name: &str, pages: &[Arc<Page<T>>]) -> fmt::Result { + let mut capacity = 0; + let mut len = 0; + + for page in pages { + if page.allocated.load(Relaxed) { + capacity += page.len; + len += page.used.load(Relaxed); + } + } + + fmt.debug_struct(name) + .field("len", &len) + .field("capacity", &capacity) + .finish() +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::SeqCst; + + struct Foo { + cnt: AtomicUsize, + id: AtomicUsize, + } + + impl Default for Foo { + fn default() -> Foo { + Foo { + cnt: AtomicUsize::new(0), + id: AtomicUsize::new(0), + } + } + } + + impl Entry for Foo { + fn reset(&self) { + self.cnt.fetch_add(1, SeqCst); + } + } + + #[test] + fn insert_remove() { + let mut slab = Slab::<Foo>::new(); + let alloc = slab.allocator(); + + let (addr1, foo1) = alloc.allocate().unwrap(); + foo1.id.store(1, SeqCst); + assert_eq!(0, foo1.cnt.load(SeqCst)); + + let (addr2, foo2) = alloc.allocate().unwrap(); + foo2.id.store(2, SeqCst); + assert_eq!(0, foo2.cnt.load(SeqCst)); + + assert_eq!(1, slab.get(addr1).unwrap().id.load(SeqCst)); + assert_eq!(2, slab.get(addr2).unwrap().id.load(SeqCst)); + + drop(foo1); + + assert_eq!(1, slab.get(addr1).unwrap().id.load(SeqCst)); + + let (addr3, foo3) = alloc.allocate().unwrap(); + assert_eq!(addr3, addr1); + assert_eq!(1, foo3.cnt.load(SeqCst)); + foo3.id.store(3, SeqCst); + assert_eq!(3, slab.get(addr3).unwrap().id.load(SeqCst)); + + drop(foo2); + drop(foo3); + + slab.compact(); + + // The first page is never released + assert!(slab.get(addr1).is_some()); + assert!(slab.get(addr2).is_some()); + assert!(slab.get(addr3).is_some()); + } + + #[test] + fn insert_many() { + const MANY: usize = normal_or_miri(10_000, 50); + + let mut slab = Slab::<Foo>::new(); + let alloc = slab.allocator(); + let mut entries = vec![]; + + for i in 0..MANY { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + entries.push((addr, val)); + } + + for (i, (addr, v)) in entries.iter().enumerate() { + assert_eq!(i, v.id.load(SeqCst)); + assert_eq!(i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + + entries.clear(); + + for i in 0..MANY { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(MANY - i, SeqCst); + entries.push((addr, val)); + } + + for (i, (addr, v)) in entries.iter().enumerate() { + assert_eq!(MANY - i, v.id.load(SeqCst)); + assert_eq!(MANY - i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + } + + #[test] + fn insert_drop_reverse() { + let mut slab = Slab::<Foo>::new(); + let alloc = slab.allocator(); + let mut entries = vec![]; + + for i in 0..normal_or_miri(10_000, 100) { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + entries.push((addr, val)); + } + + for _ in 0..10 { + // Drop 1000 in reverse + for _ in 0..normal_or_miri(1_000, 10) { + entries.pop(); + } + + // Check remaining + for (i, (addr, v)) in entries.iter().enumerate() { + assert_eq!(i, v.id.load(SeqCst)); + assert_eq!(i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + } + } + + #[test] + fn no_compaction_if_page_still_in_use() { + let mut slab = Slab::<Foo>::new(); + let alloc = slab.allocator(); + let mut entries1 = vec![]; + let mut entries2 = vec![]; + + for i in 0..normal_or_miri(10_000, 100) { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + + if i % 2 == 0 { + entries1.push((addr, val, i)); + } else { + entries2.push(val); + } + } + + drop(entries2); + + for (addr, _, i) in &entries1 { + assert_eq!(*i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + } + + const fn normal_or_miri(normal: usize, miri: usize) -> usize { + if cfg!(miri) { + miri + } else { + normal + } + } + + #[test] + fn compact_all() { + let mut slab = Slab::<Foo>::new(); + let alloc = slab.allocator(); + let mut entries = vec![]; + + for _ in 0..2 { + entries.clear(); + + for i in 0..normal_or_miri(10_000, 100) { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + + entries.push((addr, val)); + } + + let mut addrs = vec![]; + + for (addr, _) in entries.drain(..) { + addrs.push(addr); + } + + slab.compact(); + + // The first page is never freed + for addr in &addrs[PAGE_INITIAL_SIZE..] { + assert!(slab.get(*addr).is_none()); + } + } + } + + #[test] + fn issue_3014() { + let mut slab = Slab::<Foo>::new(); + let alloc = slab.allocator(); + let mut entries = vec![]; + + for _ in 0..normal_or_miri(5, 2) { + entries.clear(); + + // Allocate a few pages + 1 + for i in 0..(32 + 64 + 128 + 1) { + let (addr, val) = alloc.allocate().unwrap(); + val.id.store(i, SeqCst); + + entries.push((addr, val, i)); + } + + for (addr, val, i) in &entries { + assert_eq!(*i, val.id.load(SeqCst)); + assert_eq!(*i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + + // Release the last entry + entries.pop(); + + // Compact + slab.compact(); + + // Check all the addresses + + for (addr, val, i) in &entries { + assert_eq!(*i, val.id.load(SeqCst)); + assert_eq!(*i, slab.get(*addr).unwrap().id.load(SeqCst)); + } + } + } +} diff --git a/third_party/rust/tokio/src/util/sync_wrapper.rs b/third_party/rust/tokio/src/util/sync_wrapper.rs new file mode 100644 index 0000000000..5ffc8f96b1 --- /dev/null +++ b/third_party/rust/tokio/src/util/sync_wrapper.rs @@ -0,0 +1,26 @@ +//! This module contains a type that can make `Send + !Sync` types `Sync` by +//! disallowing all immutable access to the value. +//! +//! A similar primitive is provided in the `sync_wrapper` crate. + +pub(crate) struct SyncWrapper<T> { + value: T, +} + +// safety: The SyncWrapper being send allows you to send the inner value across +// thread boundaries. +unsafe impl<T: Send> Send for SyncWrapper<T> {} + +// safety: An immutable reference to a SyncWrapper is useless, so moving such an +// immutable reference across threads is safe. +unsafe impl<T> Sync for SyncWrapper<T> {} + +impl<T> SyncWrapper<T> { + pub(crate) fn new(value: T) -> Self { + Self { value } + } + + pub(crate) fn into_inner(self) -> T { + self.value + } +} diff --git a/third_party/rust/tokio/src/util/trace.rs b/third_party/rust/tokio/src/util/trace.rs new file mode 100644 index 0000000000..6080e2358a --- /dev/null +++ b/third_party/rust/tokio/src/util/trace.rs @@ -0,0 +1,99 @@ +cfg_trace! { + cfg_rt! { + use core::{ + pin::Pin, + task::{Context, Poll}, + }; + use pin_project_lite::pin_project; + use std::future::Future; + pub(crate) use tracing::instrument::Instrumented; + + #[inline] + #[track_caller] + pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>) -> Instrumented<F> { + use tracing::instrument::Instrument; + let location = std::panic::Location::caller(); + let span = tracing::trace_span!( + target: "tokio::task", + "runtime.spawn", + %kind, + task.name = %name.unwrap_or_default(), + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + task.instrument(span) + } + + pub(crate) fn async_op<P,F>(inner: P, resource_span: tracing::Span, source: &str, poll_op_name: &'static str, inherits_child_attrs: bool) -> InstrumentedAsyncOp<F> + where P: FnOnce() -> F { + resource_span.in_scope(|| { + let async_op_span = tracing::trace_span!("runtime.resource.async_op", source = source, inherits_child_attrs = inherits_child_attrs); + let enter = async_op_span.enter(); + let async_op_poll_span = tracing::trace_span!("runtime.resource.async_op.poll"); + let inner = inner(); + drop(enter); + let tracing_ctx = AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span: resource_span.clone(), + }; + InstrumentedAsyncOp { + inner, + tracing_ctx, + poll_op_name, + } + }) + } + + #[derive(Debug, Clone)] + pub(crate) struct AsyncOpTracingCtx { + pub(crate) async_op_span: tracing::Span, + pub(crate) async_op_poll_span: tracing::Span, + pub(crate) resource_span: tracing::Span, + } + + + pin_project! { + #[derive(Debug, Clone)] + pub(crate) struct InstrumentedAsyncOp<F> { + #[pin] + pub(crate) inner: F, + pub(crate) tracing_ctx: AsyncOpTracingCtx, + pub(crate) poll_op_name: &'static str + } + } + + impl<F: Future> Future for InstrumentedAsyncOp<F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.project(); + let poll_op_name = &*this.poll_op_name; + let _res_enter = this.tracing_ctx.resource_span.enter(); + let _async_op_enter = this.tracing_ctx.async_op_span.enter(); + let _async_op_poll_enter = this.tracing_ctx.async_op_poll_span.enter(); + trace_poll_op!(poll_op_name, this.inner.poll(cx)) + } + } + } +} +cfg_time! { + #[track_caller] + pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Some(std::panic::Location::caller()); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + None + } +} + +cfg_not_trace! { + cfg_rt! { + #[inline] + pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>) -> F { + // nop + task + } + } +} diff --git a/third_party/rust/tokio/src/util/try_lock.rs b/third_party/rust/tokio/src/util/try_lock.rs new file mode 100644 index 0000000000..8b0edb4a87 --- /dev/null +++ b/third_party/rust/tokio/src/util/try_lock.rs @@ -0,0 +1,80 @@ +use crate::loom::sync::atomic::AtomicBool; + +use std::cell::UnsafeCell; +use std::marker::PhantomData; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::Ordering::SeqCst; + +pub(crate) struct TryLock<T> { + locked: AtomicBool, + data: UnsafeCell<T>, +} + +pub(crate) struct LockGuard<'a, T> { + lock: &'a TryLock<T>, + _p: PhantomData<std::rc::Rc<()>>, +} + +unsafe impl<T: Send> Send for TryLock<T> {} +unsafe impl<T: Send> Sync for TryLock<T> {} + +unsafe impl<T: Sync> Sync for LockGuard<'_, T> {} + +macro_rules! new { + ($data:ident) => { + TryLock { + locked: AtomicBool::new(false), + data: UnsafeCell::new($data), + } + }; +} + +impl<T> TryLock<T> { + #[cfg(not(loom))] + /// Create a new `TryLock` + pub(crate) const fn new(data: T) -> TryLock<T> { + new!(data) + } + + #[cfg(loom)] + /// Create a new `TryLock` + pub(crate) fn new(data: T) -> TryLock<T> { + new!(data) + } + + /// Attempt to acquire lock + pub(crate) fn try_lock(&self) -> Option<LockGuard<'_, T>> { + if self + .locked + .compare_exchange(false, true, SeqCst, SeqCst) + .is_err() + { + return None; + } + + Some(LockGuard { + lock: self, + _p: PhantomData, + }) + } +} + +impl<T> Deref for LockGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +impl<T> DerefMut for LockGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +impl<T> Drop for LockGuard<'_, T> { + fn drop(&mut self) { + self.lock.locked.store(false, SeqCst); + } +} diff --git a/third_party/rust/tokio/src/util/vec_deque_cell.rs b/third_party/rust/tokio/src/util/vec_deque_cell.rs new file mode 100644 index 0000000000..b4e124c151 --- /dev/null +++ b/third_party/rust/tokio/src/util/vec_deque_cell.rs @@ -0,0 +1,53 @@ +use crate::loom::cell::UnsafeCell; + +use std::collections::VecDeque; +use std::marker::PhantomData; + +/// This type is like VecDeque, except that it is not Sync and can be modified +/// through immutable references. +pub(crate) struct VecDequeCell<T> { + inner: UnsafeCell<VecDeque<T>>, + _not_sync: PhantomData<*const ()>, +} + +// This is Send for the same reasons that RefCell<VecDeque<T>> is Send. +unsafe impl<T: Send> Send for VecDequeCell<T> {} + +impl<T> VecDequeCell<T> { + pub(crate) fn with_capacity(cap: usize) -> Self { + Self { + inner: UnsafeCell::new(VecDeque::with_capacity(cap)), + _not_sync: PhantomData, + } + } + + /// Safety: This method may not be called recursively. + #[inline] + unsafe fn with_inner<F, R>(&self, f: F) -> R + where + F: FnOnce(&mut VecDeque<T>) -> R, + { + // safety: This type is not Sync, so concurrent calls of this method + // cannot happen. Furthermore, the caller guarantees that the method is + // not called recursively. Finally, this is the only place that can + // create mutable references to the inner VecDeque. This ensures that + // any mutable references created here are exclusive. + self.inner.with_mut(|ptr| f(&mut *ptr)) + } + + pub(crate) fn pop_front(&self) -> Option<T> { + unsafe { self.with_inner(VecDeque::pop_front) } + } + + pub(crate) fn push_back(&self, item: T) { + unsafe { + self.with_inner(|inner| inner.push_back(item)); + } + } + + /// Replaces the inner VecDeque with an empty VecDeque and return the current + /// contents. + pub(crate) fn take(&self) -> VecDeque<T> { + unsafe { self.with_inner(|inner| std::mem::take(inner)) } + } +} diff --git a/third_party/rust/tokio/src/util/wake.rs b/third_party/rust/tokio/src/util/wake.rs new file mode 100644 index 0000000000..5526cbc63a --- /dev/null +++ b/third_party/rust/tokio/src/util/wake.rs @@ -0,0 +1,80 @@ +use crate::loom::sync::Arc; + +use std::marker::PhantomData; +use std::mem::ManuallyDrop; +use std::ops::Deref; +use std::task::{RawWaker, RawWakerVTable, Waker}; + +/// Simplified waking interface based on Arcs. +pub(crate) trait Wake: Send + Sync + Sized + 'static { + /// Wake by value. + fn wake(arc_self: Arc<Self>); + + /// Wake by reference. + fn wake_by_ref(arc_self: &Arc<Self>); +} + +/// A `Waker` that is only valid for a given lifetime. +#[derive(Debug)] +pub(crate) struct WakerRef<'a> { + waker: ManuallyDrop<Waker>, + _p: PhantomData<&'a ()>, +} + +impl Deref for WakerRef<'_> { + type Target = Waker; + + fn deref(&self) -> &Waker { + &self.waker + } +} + +/// Creates a reference to a `Waker` from a reference to `Arc<impl Wake>`. +pub(crate) fn waker_ref<W: Wake>(wake: &Arc<W>) -> WakerRef<'_> { + let ptr = Arc::as_ptr(wake) as *const (); + + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) }; + + WakerRef { + waker: ManuallyDrop::new(waker), + _p: PhantomData, + } +} + +fn waker_vtable<W: Wake>() -> &'static RawWakerVTable { + &RawWakerVTable::new( + clone_arc_raw::<W>, + wake_arc_raw::<W>, + wake_by_ref_arc_raw::<W>, + drop_arc_raw::<W>, + ) +} + +unsafe fn inc_ref_count<T: Wake>(data: *const ()) { + // Retain Arc, but don't touch refcount by wrapping in ManuallyDrop + let arc = ManuallyDrop::new(Arc::<T>::from_raw(data as *const T)); + + // Now increase refcount, but don't drop new refcount either + let _arc_clone: ManuallyDrop<_> = arc.clone(); +} + +unsafe fn clone_arc_raw<T: Wake>(data: *const ()) -> RawWaker { + inc_ref_count::<T>(data); + RawWaker::new(data, waker_vtable::<T>()) +} + +unsafe fn wake_arc_raw<T: Wake>(data: *const ()) { + let arc: Arc<T> = Arc::from_raw(data as *const T); + Wake::wake(arc); +} + +// used by `waker_ref` +unsafe fn wake_by_ref_arc_raw<T: Wake>(data: *const ()) { + // Retain Arc, but don't touch refcount by wrapping in ManuallyDrop + let arc = ManuallyDrop::new(Arc::<T>::from_raw(data as *const T)); + Wake::wake_by_ref(&arc); +} + +unsafe fn drop_arc_raw<T: Wake>(data: *const ()) { + drop(Arc::<T>::from_raw(data as *const T)) +} diff --git a/third_party/rust/tokio/src/util/wake_list.rs b/third_party/rust/tokio/src/util/wake_list.rs new file mode 100644 index 0000000000..aa569dd17b --- /dev/null +++ b/third_party/rust/tokio/src/util/wake_list.rs @@ -0,0 +1,53 @@ +use core::mem::MaybeUninit; +use core::ptr; +use std::task::Waker; + +const NUM_WAKERS: usize = 32; + +pub(crate) struct WakeList { + inner: [MaybeUninit<Waker>; NUM_WAKERS], + curr: usize, +} + +impl WakeList { + pub(crate) fn new() -> Self { + Self { + inner: unsafe { + // safety: Create an uninitialized array of `MaybeUninit`. The + // `assume_init` is safe because the type we are claiming to + // have initialized here is a bunch of `MaybeUninit`s, which do + // not require initialization. + MaybeUninit::uninit().assume_init() + }, + curr: 0, + } + } + + #[inline] + pub(crate) fn can_push(&self) -> bool { + self.curr < NUM_WAKERS + } + + pub(crate) fn push(&mut self, val: Waker) { + debug_assert!(self.can_push()); + + self.inner[self.curr] = MaybeUninit::new(val); + self.curr += 1; + } + + pub(crate) fn wake_all(&mut self) { + assert!(self.curr <= NUM_WAKERS); + while self.curr > 0 { + self.curr -= 1; + let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) }; + waker.wake(); + } + } +} + +impl Drop for WakeList { + fn drop(&mut self) { + let slice = ptr::slice_from_raw_parts_mut(self.inner.as_mut_ptr() as *mut Waker, self.curr); + unsafe { ptr::drop_in_place(slice) }; + } +} |