diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/util | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/util')
-rw-r--r-- | vendor/tokio/src/util/atomic_cell.rs | 51 | ||||
-rw-r--r-- | vendor/tokio/src/util/bit.rs | 6 | ||||
-rw-r--r-- | vendor/tokio/src/util/error.rs | 11 | ||||
-rw-r--r-- | vendor/tokio/src/util/idle_notified_set.rs | 481 | ||||
-rw-r--r-- | vendor/tokio/src/util/linked_list.rs | 276 | ||||
-rw-r--r-- | vendor/tokio/src/util/markers.rs | 8 | ||||
-rw-r--r-- | vendor/tokio/src/util/memchr.rs | 74 | ||||
-rw-r--r-- | vendor/tokio/src/util/mod.rs | 67 | ||||
-rw-r--r-- | vendor/tokio/src/util/once_cell.rs | 70 | ||||
-rw-r--r-- | vendor/tokio/src/util/pad.rs | 52 | ||||
-rw-r--r-- | vendor/tokio/src/util/rand.rs | 87 | ||||
-rw-r--r-- | vendor/tokio/src/util/rand/rt.rs | 61 | ||||
-rw-r--r-- | vendor/tokio/src/util/rand/rt_unstable.rs | 20 | ||||
-rw-r--r-- | vendor/tokio/src/util/rc_cell.rs | 57 | ||||
-rw-r--r-- | vendor/tokio/src/util/slab.rs | 97 | ||||
-rw-r--r-- | vendor/tokio/src/util/sync_wrapper.rs | 26 | ||||
-rw-r--r-- | vendor/tokio/src/util/trace.rs | 91 | ||||
-rw-r--r-- | vendor/tokio/src/util/wake.rs | 15 | ||||
-rw-r--r-- | vendor/tokio/src/util/wake_list.rs | 53 |
19 files changed, 1374 insertions, 229 deletions
diff --git a/vendor/tokio/src/util/atomic_cell.rs b/vendor/tokio/src/util/atomic_cell.rs new file mode 100644 index 000000000..07e37303a --- /dev/null +++ b/vendor/tokio/src/util/atomic_cell.rs @@ -0,0 +1,51 @@ +use crate::loom::sync::atomic::AtomicPtr; + +use std::ptr; +use std::sync::atomic::Ordering::AcqRel; + +pub(crate) struct AtomicCell<T> { + data: AtomicPtr<T>, +} + +unsafe impl<T: Send> Send for AtomicCell<T> {} +unsafe impl<T: Send> Sync for AtomicCell<T> {} + +impl<T> AtomicCell<T> { + pub(crate) fn new(data: Option<Box<T>>) -> AtomicCell<T> { + AtomicCell { + data: AtomicPtr::new(to_raw(data)), + } + } + + pub(crate) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> { + let old = self.data.swap(to_raw(val), AcqRel); + from_raw(old) + } + + pub(crate) fn set(&self, val: Box<T>) { + let _ = self.swap(Some(val)); + } + + pub(crate) fn take(&self) -> Option<Box<T>> { + self.swap(None) + } +} + +fn to_raw<T>(data: Option<Box<T>>) -> *mut T { + data.map(Box::into_raw).unwrap_or(ptr::null_mut()) +} + +fn from_raw<T>(val: *mut T) -> Option<Box<T>> { + if val.is_null() { + None + } else { + Some(unsafe { Box::from_raw(val) }) + } +} + +impl<T> Drop for AtomicCell<T> { + fn drop(&mut self) { + // Free any data still held by the cell + let _ = self.take(); + } +} diff --git a/vendor/tokio/src/util/bit.rs b/vendor/tokio/src/util/bit.rs index 392a0e8b0..a43c2c2d3 100644 --- a/vendor/tokio/src/util/bit.rs +++ b/vendor/tokio/src/util/bit.rs @@ -27,7 +27,7 @@ impl Pack { pointer_width() - (self.mask >> self.shift).leading_zeros() } - /// Max representable value + /// Max representable value. pub(crate) const fn max_value(&self) -> usize { (1 << self.width()) - 1 } @@ -60,7 +60,7 @@ impl fmt::Debug for Pack { } } -/// Returns the width of a pointer in bits +/// Returns the width of a pointer in bits. pub(crate) const fn pointer_width() -> u32 { std::mem::size_of::<usize>() as u32 * 8 } @@ -71,7 +71,7 @@ pub(crate) const fn mask_for(n: u32) -> usize { shift | (shift - 1) } -/// Unpack a value using a mask & shift +/// Unpacks a value using a mask & shift. pub(crate) const fn unpack(src: usize, mask: usize, shift: u32) -> usize { (src & mask) >> shift } diff --git a/vendor/tokio/src/util/error.rs b/vendor/tokio/src/util/error.rs index 0e52364a7..ebb27f638 100644 --- a/vendor/tokio/src/util/error.rs +++ b/vendor/tokio/src/util/error.rs @@ -1,9 +1,16 @@ +// Some combinations of features may not use these constants. +#![cfg_attr(not(feature = "full"), allow(dead_code))] + /// Error string explaining that the Tokio context hasn't been instantiated. pub(crate) const CONTEXT_MISSING_ERROR: &str = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"; -// some combinations of features might not use this -#[allow(dead_code)] /// Error string explaining that the Tokio context is shutting down and cannot drive timers. pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = "A Tokio 1.x context was found, but it is being shutdown."; + +/// Error string explaining that the Tokio context is not available because the +/// thread-local storing it has been destroyed. This usually only happens during +/// destructors of other thread-locals. +pub(crate) const THREAD_LOCAL_DESTROYED_ERROR: &str = + "The Tokio context thread-local variable has been destroyed."; diff --git a/vendor/tokio/src/util/idle_notified_set.rs b/vendor/tokio/src/util/idle_notified_set.rs new file mode 100644 index 000000000..19b81e28b --- /dev/null +++ b/vendor/tokio/src/util/idle_notified_set.rs @@ -0,0 +1,481 @@ +//! This module defines an `IdleNotifiedSet`, which is a collection of elements. +//! Each element is intended to correspond to a task, and the collection will +//! keep track of which tasks have had their waker notified, and which have not. +//! +//! Each entry in the set holds some user-specified value. The value's type is +//! specified using the `T` parameter. It will usually be a `JoinHandle` or +//! similar. + +use std::marker::PhantomPinned; +use std::mem::ManuallyDrop; +use std::ptr::NonNull; +use std::task::{Context, Waker}; + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::{Arc, Mutex}; +use crate::util::linked_list::{self, Link}; +use crate::util::{waker_ref, Wake}; + +type LinkedList<T> = + linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>; + +/// This is the main handle to the collection. +pub(crate) struct IdleNotifiedSet<T> { + lists: Arc<Lists<T>>, + length: usize, +} + +/// A handle to an entry that is guaranteed to be stored in the idle or notified +/// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet` +/// mutably to prevent the entry from being moved to the `Neither` list, which +/// only the `IdleNotifiedSet` may do. +/// +/// The main consequence of being stored in one of the lists is that the `value` +/// field has not yet been consumed. +/// +/// Note: This entry can be moved from the idle to the notified list while this +/// object exists by waking its waker. +pub(crate) struct EntryInOneOfTheLists<'a, T> { + entry: Arc<ListEntry<T>>, + set: &'a mut IdleNotifiedSet<T>, +} + +type Lists<T> = Mutex<ListsInner<T>>; + +/// The linked lists hold strong references to the ListEntry items, and the +/// ListEntry items also hold a strong reference back to the Lists object, but +/// the destructor of the `IdleNotifiedSet` will clear the two lists, so once +/// that object is destroyed, no ref-cycles will remain. +struct ListsInner<T> { + notified: LinkedList<T>, + idle: LinkedList<T>, + /// Whenever an element in the `notified` list is woken, this waker will be + /// notified and consumed, if it exists. + waker: Option<Waker>, +} + +/// Which of the two lists in the shared Lists object is this entry stored in? +/// +/// If the value is `Idle`, then an entry's waker may move it to the notified +/// list. Otherwise, only the `IdleNotifiedSet` may move it. +/// +/// If the value is `Neither`, then it is still possible that the entry is in +/// some third external list (this happens in `drain`). +#[derive(Copy, Clone, Eq, PartialEq)] +enum List { + Notified, + Idle, + Neither, +} + +/// An entry in the list. +/// +/// # Safety +/// +/// The `my_list` field must only be accessed while holding the mutex in +/// `parent`. It is an invariant that the value of `my_list` corresponds to +/// which linked list in the `parent` holds this entry. Once this field takes +/// the value `Neither`, then it may never be modified again. +/// +/// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field +/// must only be accessed while holding the mutex. If the value of `my_list` is +/// `Neither`, then the `pointers` field may be accessed by the +/// `IdleNotifiedSet` (this happens inside `drain`). +/// +/// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed +/// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to +/// `Neither` assumes ownership of the `value`, and it must either drop it or +/// move it out from this entry to prevent it from getting leaked. (Since the +/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the +/// value should not be leaked.) +struct ListEntry<T> { + /// The linked list pointers of the list this entry is in. + pointers: linked_list::Pointers<ListEntry<T>>, + /// Pointer to the shared `Lists` struct. + parent: Arc<Lists<T>>, + /// The value stored in this entry. + value: UnsafeCell<ManuallyDrop<T>>, + /// Used to remember which list this entry is in. + my_list: UnsafeCell<List>, + /// Required by the `linked_list::Pointers` field. + _pin: PhantomPinned, +} + +generate_addr_of_methods! { + impl<T> ListEntry<T> { + unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> { + &self.pointers + } + } +} + +// With mutable access to the `IdleNotifiedSet`, you can get mutable access to +// the values. +unsafe impl<T: Send> Send for IdleNotifiedSet<T> {} +// With the current API we strictly speaking don't even need `T: Sync`, but we +// require it anyway to support adding &self APIs that access the values in the +// future. +unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {} + +// These impls control when it is safe to create a Waker. Since the waker does +// not allow access to the value in any way (including its destructor), it is +// not necessary for `T` to be Send or Sync. +unsafe impl<T> Send for ListEntry<T> {} +unsafe impl<T> Sync for ListEntry<T> {} + +impl<T> IdleNotifiedSet<T> { + /// Create a new IdleNotifiedSet. + pub(crate) fn new() -> Self { + let lists = Mutex::new(ListsInner { + notified: LinkedList::new(), + idle: LinkedList::new(), + waker: None, + }); + + IdleNotifiedSet { + lists: Arc::new(lists), + length: 0, + } + } + + pub(crate) fn len(&self) -> usize { + self.length + } + + pub(crate) fn is_empty(&self) -> bool { + self.length == 0 + } + + /// Insert the given value into the `idle` list. + pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> { + self.length += 1; + + let entry = Arc::new(ListEntry { + parent: self.lists.clone(), + value: UnsafeCell::new(ManuallyDrop::new(value)), + my_list: UnsafeCell::new(List::Idle), + pointers: linked_list::Pointers::new(), + _pin: PhantomPinned, + }); + + { + let mut lock = self.lists.lock(); + lock.idle.push_front(entry.clone()); + } + + // Safety: We just put the entry in the idle list, so it is in one of the lists. + EntryInOneOfTheLists { entry, set: self } + } + + /// Pop an entry from the notified list to poll it. The entry is moved to + /// the idle list atomically. + pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> { + // We don't decrement the length because this call moves the entry to + // the idle list rather than removing it. + if self.length == 0 { + // Fast path. + return None; + } + + let mut lock = self.lists.lock(); + + let should_update_waker = match lock.waker.as_mut() { + Some(cur_waker) => !waker.will_wake(cur_waker), + None => true, + }; + if should_update_waker { + lock.waker = Some(waker.clone()); + } + + // Pop the entry, returning None if empty. + let entry = lock.notified.pop_back()?; + + lock.idle.push_front(entry.clone()); + + // Safety: We are holding the lock. + entry.my_list.with_mut(|ptr| unsafe { + *ptr = List::Idle; + }); + + drop(lock); + + // Safety: We just put the entry in the idle list, so it is in one of the lists. + Some(EntryInOneOfTheLists { entry, set: self }) + } + + /// Call a function on every element in this list. + pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) { + fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) { + let mut node = list.last(); + + while let Some(entry) = node { + ptrs.push(entry.value.with_mut(|ptr| { + let ptr: *mut ManuallyDrop<T> = ptr; + let ptr: *mut T = ptr.cast(); + ptr + })); + + let prev = entry.pointers.get_prev(); + node = prev.map(|prev| unsafe { &*prev.as_ptr() }); + } + } + + // Atomically get a raw pointer to the value of every entry. + // + // Since this only locks the mutex once, it is not possible for a value + // to get moved from the idle list to the notified list during the + // operation, which would otherwise result in some value being listed + // twice. + let mut ptrs = Vec::with_capacity(self.len()); + { + let mut lock = self.lists.lock(); + + get_ptrs(&mut lock.idle, &mut ptrs); + get_ptrs(&mut lock.notified, &mut ptrs); + } + debug_assert_eq!(ptrs.len(), ptrs.capacity()); + + for ptr in ptrs { + // Safety: When we grabbed the pointers, the entries were in one of + // the two lists. This means that their value was valid at the time, + // and it must still be valid because we are the IdleNotifiedSet, + // and only we can remove an entry from the two lists. (It's + // possible that an entry is moved from one list to the other during + // this loop, but that is ok.) + func(unsafe { &mut *ptr }); + } + } + + /// Remove all entries in both lists, applying some function to each element. + /// + /// The closure is called on all elements even if it panics. Having it panic + /// twice is a double-panic, and will abort the application. + pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) { + if self.length == 0 { + // Fast path. + return; + } + self.length = 0; + + // The LinkedList is not cleared on panic, so we use a bomb to clear it. + // + // This value has the invariant that any entry in its `all_entries` list + // has `my_list` set to `Neither` and that the value has not yet been + // dropped. + struct AllEntries<T, F: FnMut(T)> { + all_entries: LinkedList<T>, + func: F, + } + + impl<T, F: FnMut(T)> AllEntries<T, F> { + fn pop_next(&mut self) -> bool { + if let Some(entry) = self.all_entries.pop_back() { + // Safety: We just took this value from the list, so we can + // destroy the value in the entry. + entry + .value + .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) }); + true + } else { + false + } + } + } + + impl<T, F: FnMut(T)> Drop for AllEntries<T, F> { + fn drop(&mut self) { + while self.pop_next() {} + } + } + + let mut all_entries = AllEntries { + all_entries: LinkedList::new(), + func, + }; + + // Atomically move all entries to the new linked list in the AllEntries + // object. + { + let mut lock = self.lists.lock(); + unsafe { + // Safety: We are holding the lock and `all_entries` is a new + // LinkedList. + move_to_new_list(&mut lock.idle, &mut all_entries.all_entries); + move_to_new_list(&mut lock.notified, &mut all_entries.all_entries); + } + } + + // Keep destroying entries in the list until it is empty. + // + // If the closure panics, then the destructor of the `AllEntries` bomb + // ensures that we keep running the destructor on the remaining values. + // A second panic will abort the program. + while all_entries.pop_next() {} + } +} + +/// # Safety +/// +/// The mutex for the entries must be held, and the target list must be such +/// that setting `my_list` to `Neither` is ok. +unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) { + while let Some(entry) = from.pop_back() { + entry.my_list.with_mut(|ptr| { + *ptr = List::Neither; + }); + to.push_front(entry); + } +} + +impl<'a, T> EntryInOneOfTheLists<'a, T> { + /// Remove this entry from the list it is in, returning the value associated + /// with the entry. + /// + /// This consumes the value, since it is no longer guaranteed to be in a + /// list. + pub(crate) fn remove(self) -> T { + self.set.length -= 1; + + { + let mut lock = self.set.lists.lock(); + + // Safety: We are holding the lock so there is no race, and we will + // remove the entry afterwards to uphold invariants. + let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe { + let old_my_list = *ptr; + *ptr = List::Neither; + old_my_list + }); + + let list = match old_my_list { + List::Idle => &mut lock.idle, + List::Notified => &mut lock.notified, + // An entry in one of the lists is in one of the lists. + List::Neither => unreachable!(), + }; + + unsafe { + // Safety: We just checked that the entry is in this particular + // list. + list.remove(ListEntry::as_raw(&self.entry)).unwrap(); + } + } + + // By setting `my_list` to `Neither`, we have taken ownership of the + // value. We return it to the caller. + // + // Safety: We have a mutable reference to the `IdleNotifiedSet` that + // owns this entry, so we can use its permission to access the value. + self.entry + .value + .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) }) + } + + /// Access the value in this entry together with a context for its waker. + pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U + where + F: FnOnce(&mut T, &mut Context<'_>) -> U, + T: 'static, + { + let waker = waker_ref(&self.entry); + + let mut context = Context::from_waker(&waker); + + // Safety: We have a mutable reference to the `IdleNotifiedSet` that + // owns this entry, so we can use its permission to access the value. + self.entry + .value + .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) }) + } +} + +impl<T> Drop for IdleNotifiedSet<T> { + fn drop(&mut self) { + // Clear both lists. + self.drain(drop); + + #[cfg(debug_assertions)] + if !std::thread::panicking() { + let lock = self.lists.lock(); + assert!(lock.idle.is_empty()); + assert!(lock.notified.is_empty()); + } + } +} + +impl<T: 'static> Wake for ListEntry<T> { + fn wake_by_ref(me: &Arc<Self>) { + let mut lock = me.parent.lock(); + + // Safety: We are holding the lock and we will update the lists to + // maintain invariants. + let old_my_list = me.my_list.with_mut(|ptr| unsafe { + let old_my_list = *ptr; + if old_my_list == List::Idle { + *ptr = List::Notified; + } + old_my_list + }); + + if old_my_list == List::Idle { + // We move ourself to the notified list. + let me = unsafe { + // Safety: We just checked that we are in this particular list. + lock.idle.remove(ListEntry::as_raw(me)).unwrap() + }; + lock.notified.push_front(me); + + if let Some(waker) = lock.waker.take() { + drop(lock); + waker.wake(); + } + } + } + + fn wake(me: Arc<Self>) { + Self::wake_by_ref(&me) + } +} + +/// # Safety +/// +/// `ListEntry` is forced to be !Unpin. +unsafe impl<T> linked_list::Link for ListEntry<T> { + type Handle = Arc<ListEntry<T>>; + type Target = ListEntry<T>; + + fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> { + let ptr: *const ListEntry<T> = Arc::as_ptr(handle); + // Safety: We can't get a null pointer from `Arc::as_ptr`. + unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) } + } + + unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> { + Arc::from_raw(ptr.as_ptr()) + } + + unsafe fn pointers( + target: NonNull<ListEntry<T>>, + ) -> NonNull<linked_list::Pointers<ListEntry<T>>> { + ListEntry::addr_of_pointers(target) + } +} + +#[cfg(all(test, not(loom)))] +mod tests { + use crate::runtime::Builder; + use crate::task::JoinSet; + + // A test that runs under miri. + // + // https://github.com/tokio-rs/tokio/pull/5693 + #[test] + fn join_set_test() { + let rt = Builder::new_current_thread().build().unwrap(); + + let mut set = JoinSet::new(); + set.spawn_on(futures::future::ready(()), rt.handle()); + + rt.block_on(set.join_next()).unwrap().unwrap(); + } +} diff --git a/vendor/tokio/src/util/linked_list.rs b/vendor/tokio/src/util/linked_list.rs index a74f56215..ca6ef0e7b 100644 --- a/vendor/tokio/src/util/linked_list.rs +++ b/vendor/tokio/src/util/linked_list.rs @@ -1,6 +1,6 @@ #![cfg_attr(not(feature = "full"), allow(dead_code))] -//! An intrusive double linked list of data +//! An intrusive double linked list of data. //! //! The data structure supports tracking pinned nodes. Most of the data //! structure's APIs are `unsafe` as they require the caller to ensure the @@ -46,10 +46,10 @@ pub(crate) unsafe trait Link { /// This is usually a pointer-ish type. type Handle; - /// Node type + /// Node type. type Target; - /// Convert the handle to a raw pointer without consuming the handle + /// Convert the handle to a raw pointer without consuming the handle. #[allow(clippy::wrong_self_convention)] fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target>; @@ -57,10 +57,17 @@ pub(crate) unsafe trait Link { unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle; /// Return the pointers for a node + /// + /// # Safety + /// + /// The resulting pointer should have the same tag in the stacked-borrows + /// stack as the argument. In particular, the method may not create an + /// intermediate reference in the process of creating the resulting raw + /// pointer. unsafe fn pointers(target: NonNull<Self::Target>) -> NonNull<Pointers<Self::Target>>; } -/// Previous / next pointers +/// Previous / next pointers. pub(crate) struct Pointers<T> { inner: UnsafeCell<PointersInner<T>>, } @@ -119,7 +126,7 @@ impl<L: Link> LinkedList<L, L::Target> { pub(crate) fn push_front(&mut self, val: L::Handle) { // The value should not be dropped, it is being inserted into the list let val = ManuallyDrop::new(val); - let ptr = L::as_raw(&*val); + let ptr = L::as_raw(&val); assert_ne!(self.head, Some(ptr)); unsafe { L::pointers(ptr).as_mut().set_next(self.head); @@ -171,8 +178,12 @@ impl<L: Link> LinkedList<L, L::Target> { /// /// # Safety /// - /// The caller **must** ensure that `node` is currently contained by - /// `self` or not contained by any other list. + /// The caller **must** ensure that exactly one of the following is true: + /// - `node` is currently contained by `self`, + /// - `node` is not contained by any list, + /// - `node` is currently contained by some other `GuardedLinkedList` **and** + /// the caller has an exclusive access to that list. This condition is + /// used by the linked list in `sync::Notify`. pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> { if let Some(prev) = L::pointers(node).as_ref().get_prev() { debug_assert_eq!(L::pointers(prev).as_ref().get_next(), Some(node)); @@ -217,8 +228,56 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> { } } +// ===== impl CountedLinkedList ==== + +// Delegates operations to the base LinkedList implementation, and adds a counter to the elements +// in the list. +pub(crate) struct CountedLinkedList<L: Link, T> { + list: LinkedList<L, T>, + count: usize, +} + +impl<L: Link> CountedLinkedList<L, L::Target> { + pub(crate) fn new() -> CountedLinkedList<L, L::Target> { + CountedLinkedList { + list: LinkedList::new(), + count: 0, + } + } + + pub(crate) fn push_front(&mut self, val: L::Handle) { + self.list.push_front(val); + self.count += 1; + } + + pub(crate) fn pop_back(&mut self) -> Option<L::Handle> { + let val = self.list.pop_back(); + if val.is_some() { + self.count -= 1; + } + val + } + + pub(crate) fn is_empty(&self) -> bool { + self.list.is_empty() + } + + pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> { + let val = self.list.remove(node); + if val.is_some() { + self.count -= 1; + } + val + } + + pub(crate) fn count(&self) -> usize { + self.count + } +} + #[cfg(any( feature = "fs", + feature = "rt", all(unix, feature = "process"), feature = "signal", feature = "sync", @@ -236,37 +295,6 @@ impl<L: Link> Default for LinkedList<L, L::Target> { } } -// ===== impl Iter ===== - -cfg_rt_multi_thread! { - pub(crate) struct Iter<'a, T: Link> { - curr: Option<NonNull<T::Target>>, - _p: core::marker::PhantomData<&'a T>, - } - - impl<L: Link> LinkedList<L, L::Target> { - pub(crate) fn iter(&self) -> Iter<'_, L> { - Iter { - curr: self.head, - _p: core::marker::PhantomData, - } - } - } - - impl<'a, T: Link> Iterator for Iter<'a, T> { - type Item = &'a T::Target; - - fn next(&mut self) -> Option<&'a T::Target> { - let curr = self.curr?; - // safety: the pointer references data contained by the list - self.curr = unsafe { T::pointers(curr).as_ref() }.get_next(); - - // safety: the value is still owned by the linked list. - Some(unsafe { &*curr.as_ptr() }) - } - } -} - // ===== impl DrainFilter ===== cfg_io_readiness! { @@ -313,6 +341,126 @@ cfg_io_readiness! { } } +cfg_taskdump! { + impl<T: Link> CountedLinkedList<T, T::Target> { + pub(crate) fn for_each<F>(&mut self, f: F) + where + F: FnMut(&T::Handle), + { + self.list.for_each(f) + } + } + + impl<T: Link> LinkedList<T, T::Target> { + pub(crate) fn for_each<F>(&mut self, mut f: F) + where + F: FnMut(&T::Handle), + { + use std::mem::ManuallyDrop; + + let mut next = self.head; + + while let Some(curr) = next { + unsafe { + let handle = ManuallyDrop::new(T::from_raw(curr)); + f(&handle); + next = T::pointers(curr).as_ref().get_next(); + } + } + } + } +} + +// ===== impl GuardedLinkedList ===== + +feature! { + #![any( + feature = "process", + feature = "sync", + feature = "rt", + feature = "signal", + )] + + /// An intrusive linked list, but instead of keeping pointers to the head + /// and tail nodes, it uses a special guard node linked with those nodes. + /// It means that the list is circular and every pointer of a node from + /// the list is not `None`, including pointers from the guard node. + /// + /// If a list is empty, then both pointers of the guard node are pointing + /// at the guard node itself. + pub(crate) struct GuardedLinkedList<L, T> { + /// Pointer to the guard node. + guard: NonNull<T>, + + /// Node type marker. + _marker: PhantomData<*const L>, + } + + impl<U, L: Link<Handle = NonNull<U>>> LinkedList<L, L::Target> { + /// Turns a linked list into the guarded version by linking the guard node + /// with the head and tail nodes. Like with other nodes, you should guarantee + /// that the guard node is pinned in memory. + pub(crate) fn into_guarded(self, guard_handle: L::Handle) -> GuardedLinkedList<L, L::Target> { + // `guard_handle` is a NonNull pointer, we don't have to care about dropping it. + let guard = L::as_raw(&guard_handle); + + unsafe { + if let Some(head) = self.head { + debug_assert!(L::pointers(head).as_ref().get_prev().is_none()); + L::pointers(head).as_mut().set_prev(Some(guard)); + L::pointers(guard).as_mut().set_next(Some(head)); + + // The list is not empty, so the tail cannot be `None`. + let tail = self.tail.unwrap(); + debug_assert!(L::pointers(tail).as_ref().get_next().is_none()); + L::pointers(tail).as_mut().set_next(Some(guard)); + L::pointers(guard).as_mut().set_prev(Some(tail)); + } else { + // The list is empty. + L::pointers(guard).as_mut().set_prev(Some(guard)); + L::pointers(guard).as_mut().set_next(Some(guard)); + } + } + + GuardedLinkedList { guard, _marker: PhantomData } + } + } + + impl<L: Link> GuardedLinkedList<L, L::Target> { + fn tail(&self) -> Option<NonNull<L::Target>> { + let tail_ptr = unsafe { + L::pointers(self.guard).as_ref().get_prev().unwrap() + }; + + // Compare the tail pointer with the address of the guard node itself. + // If the guard points at itself, then there are no other nodes and + // the list is considered empty. + if tail_ptr != self.guard { + Some(tail_ptr) + } else { + None + } + } + + /// Removes the last element from a list and returns it, or None if it is + /// empty. + pub(crate) fn pop_back(&mut self) -> Option<L::Handle> { + unsafe { + let last = self.tail()?; + let before_last = L::pointers(last).as_ref().get_prev().unwrap(); + + L::pointers(self.guard).as_mut().set_prev(Some(before_last)); + L::pointers(before_last).as_mut().set_next(Some(self.guard)); + + L::pointers(last).as_mut().set_prev(None); + L::pointers(last).as_mut().set_next(None); + + Some(L::from_raw(last)) + } + } + } +} + // ===== impl Pointers ===== impl<T> Pointers<T> { @@ -327,7 +475,7 @@ impl<T> Pointers<T> { } } - fn get_prev(&self) -> Option<NonNull<T>> { + pub(crate) fn get_prev(&self) -> Option<NonNull<T>> { // SAFETY: prev is the first field in PointersInner, which is #[repr(C)]. unsafe { let inner = self.inner.get(); @@ -335,7 +483,7 @@ impl<T> Pointers<T> { ptr::read(prev) } } - fn get_next(&self) -> Option<NonNull<T>> { + pub(crate) fn get_next(&self) -> Option<NonNull<T>> { // SAFETY: next is the second field in PointersInner, which is #[repr(C)]. unsafe { let inner = self.inner.get(); @@ -375,14 +523,15 @@ impl<T> fmt::Debug for Pointers<T> { } } -#[cfg(test)] +#[cfg(any(test, fuzzing))] #[cfg(not(loom))] -mod tests { +pub(crate) mod tests { use super::*; use std::pin::Pin; #[derive(Debug)] + #[repr(C)] struct Entry { pointers: Pointers<Entry>, val: i32, @@ -400,8 +549,8 @@ mod tests { Pin::new_unchecked(&*ptr.as_ptr()) } - unsafe fn pointers(mut target: NonNull<Entry>) -> NonNull<Pointers<Entry>> { - NonNull::from(&mut target.as_mut().pointers) + unsafe fn pointers(target: NonNull<Entry>) -> NonNull<Pointers<Entry>> { + target.cast() } } @@ -435,6 +584,7 @@ mod tests { } } + #[cfg(test)] macro_rules! assert_clean { ($e:ident) => {{ assert!($e.pointers.get_next().is_none()); @@ -442,6 +592,7 @@ mod tests { }}; } + #[cfg(test)] macro_rules! assert_ptr_eq { ($a:expr, $b:expr) => {{ // Deal with mapping a Pin<&mut T> -> Option<NonNull<T>> @@ -646,46 +797,41 @@ mod tests { } #[test] - fn iter() { + fn count() { + let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new(); + assert_eq!(0, list.count()); + let a = entry(5); let b = entry(7); - - let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); - - assert_eq!(0, list.iter().count()); - list.push_front(a.as_ref()); list.push_front(b.as_ref()); + assert_eq!(2, list.count()); - let mut i = list.iter(); - assert_eq!(7, i.next().unwrap().val); - assert_eq!(5, i.next().unwrap().val); - assert!(i.next().is_none()); - } + list.pop_back(); + assert_eq!(1, list.count()); - proptest::proptest! { - #[test] - fn fuzz_linked_list(ops: Vec<usize>) { - run_fuzz(ops); + unsafe { + list.remove(ptr(&b)); } + assert_eq!(0, list.count()); } - fn run_fuzz(ops: Vec<usize>) { - use std::collections::VecDeque; - - #[derive(Debug)] + /// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module. + #[cfg(fuzzing)] + pub fn fuzz_linked_list(ops: &[u8]) { enum Op { Push, Pop, Remove(usize), } + use std::collections::VecDeque; let ops = ops .iter() - .map(|i| match i % 3 { + .map(|i| match i % 3u8 { 0 => Op::Push, 1 => Op::Pop, - 2 => Op::Remove(i / 3), + 2 => Op::Remove((i / 3u8) as usize), _ => unreachable!(), }) .collect::<Vec<_>>(); diff --git a/vendor/tokio/src/util/markers.rs b/vendor/tokio/src/util/markers.rs new file mode 100644 index 000000000..7031fb6bc --- /dev/null +++ b/vendor/tokio/src/util/markers.rs @@ -0,0 +1,8 @@ +/// Marker for types that are `Sync` but not `Send` +pub(crate) struct SyncNotSend(*mut ()); + +unsafe impl Sync for SyncNotSend {} + +cfg_rt! { + pub(crate) struct NotSendOrSync(*mut ()); +} diff --git a/vendor/tokio/src/util/memchr.rs b/vendor/tokio/src/util/memchr.rs new file mode 100644 index 000000000..44fd2da37 --- /dev/null +++ b/vendor/tokio/src/util/memchr.rs @@ -0,0 +1,74 @@ +//! Search for a byte in a byte array using libc. +//! +//! When nothing pulls in libc, then just use a trivial implementation. Note +//! that we only depend on libc on unix. + +#[cfg(not(all(unix, feature = "libc")))] +pub(crate) fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> { + haystack.iter().position(|val| needle == *val) +} + +#[cfg(all(unix, feature = "libc"))] +pub(crate) fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> { + let start = haystack.as_ptr(); + + // SAFETY: `start` is valid for `haystack.len()` bytes. + let ptr = unsafe { libc::memchr(start.cast(), needle as _, haystack.len()) }; + + if ptr.is_null() { + None + } else { + Some(ptr as usize - start as usize) + } +} + +#[cfg(test)] +mod tests { + use super::memchr; + + #[test] + fn memchr_test() { + let haystack = b"123abc456\0\xffabc\n"; + + assert_eq!(memchr(b'1', haystack), Some(0)); + assert_eq!(memchr(b'2', haystack), Some(1)); + assert_eq!(memchr(b'3', haystack), Some(2)); + assert_eq!(memchr(b'4', haystack), Some(6)); + assert_eq!(memchr(b'5', haystack), Some(7)); + assert_eq!(memchr(b'6', haystack), Some(8)); + assert_eq!(memchr(b'7', haystack), None); + assert_eq!(memchr(b'a', haystack), Some(3)); + assert_eq!(memchr(b'b', haystack), Some(4)); + assert_eq!(memchr(b'c', haystack), Some(5)); + assert_eq!(memchr(b'd', haystack), None); + assert_eq!(memchr(b'A', haystack), None); + assert_eq!(memchr(0, haystack), Some(9)); + assert_eq!(memchr(0xff, haystack), Some(10)); + assert_eq!(memchr(0xfe, haystack), None); + assert_eq!(memchr(1, haystack), None); + assert_eq!(memchr(b'\n', haystack), Some(14)); + assert_eq!(memchr(b'\r', haystack), None); + } + + #[test] + fn memchr_all() { + let mut arr = Vec::new(); + for b in 0..=255 { + arr.push(b); + } + for b in 0..=255 { + assert_eq!(memchr(b, &arr), Some(b as usize)); + } + arr.reverse(); + for b in 0..=255 { + assert_eq!(memchr(b, &arr), Some(255 - b as usize)); + } + } + + #[test] + fn memchr_empty() { + for b in 0..=255 { + assert_eq!(memchr(b, b""), None); + } + } +} diff --git a/vendor/tokio/src/util/mod.rs b/vendor/tokio/src/util/mod.rs index b267125b1..6a7d4b103 100644 --- a/vendor/tokio/src/util/mod.rs +++ b/vendor/tokio/src/util/mod.rs @@ -3,6 +3,40 @@ cfg_io_driver! { pub(crate) mod slab; } +#[cfg(feature = "rt")] +pub(crate) mod atomic_cell; + +#[cfg(any( + feature = "rt", + feature = "signal", + feature = "process", + tokio_no_const_mutex_new, +))] +pub(crate) mod once_cell; + +#[cfg(any( + // io driver uses `WakeList` directly + feature = "net", + feature = "process", + // `sync` enables `Notify` and `batch_semaphore`, which require `WakeList`. + feature = "sync", + // `fs` uses `batch_semaphore`, which requires `WakeList`. + feature = "fs", + // rt and signal use `Notify`, which requires `WakeList`. + feature = "rt", + feature = "signal", +))] +mod wake_list; +#[cfg(any( + feature = "net", + feature = "process", + feature = "sync", + feature = "fs", + feature = "rt", + feature = "signal", +))] +pub(crate) use wake_list::WakeList; + #[cfg(any( feature = "fs", feature = "net", @@ -14,33 +48,36 @@ cfg_io_driver! { ))] pub(crate) mod linked_list; -#[cfg(any(feature = "rt-multi-thread", feature = "macros"))] -mod rand; +#[cfg(any(feature = "rt", feature = "macros"))] +pub(crate) mod rand; cfg_rt! { + mod idle_notified_set; + pub(crate) use idle_notified_set::IdleNotifiedSet; + + pub(crate) use self::rand::RngSeedGenerator; + mod wake; pub(crate) use wake::WakerRef; pub(crate) use wake::{waker_ref, Wake}; + + mod sync_wrapper; + pub(crate) use sync_wrapper::SyncWrapper; + + mod rc_cell; + pub(crate) use rc_cell::RcCell; } cfg_rt_multi_thread! { - pub(crate) use self::rand::FastRand; - mod try_lock; pub(crate) use try_lock::TryLock; } pub(crate) mod trace; -#[cfg(any(feature = "macros"))] -#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use self::rand::thread_rng_n; - -#[cfg(any( - feature = "rt", - feature = "time", - feature = "net", - feature = "process", - all(unix, feature = "signal") -))] pub(crate) mod error; + +#[cfg(feature = "io-util")] +pub(crate) mod memchr; + +pub(crate) mod markers; diff --git a/vendor/tokio/src/util/once_cell.rs b/vendor/tokio/src/util/once_cell.rs new file mode 100644 index 000000000..71fc00758 --- /dev/null +++ b/vendor/tokio/src/util/once_cell.rs @@ -0,0 +1,70 @@ +#![allow(dead_code)] +use std::cell::UnsafeCell; +use std::mem::MaybeUninit; +use std::sync::Once; + +pub(crate) struct OnceCell<T> { + once: Once, + value: UnsafeCell<MaybeUninit<T>>, +} + +unsafe impl<T: Send + Sync> Send for OnceCell<T> {} +unsafe impl<T: Send + Sync> Sync for OnceCell<T> {} + +impl<T> OnceCell<T> { + pub(crate) const fn new() -> Self { + Self { + once: Once::new(), + value: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + /// Get the value inside this cell, initializing it using the provided + /// function if necessary. + /// + /// If the `init` closure panics, then the `OnceCell` is poisoned and all + /// future calls to `get` will panic. + #[inline] + pub(crate) fn get(&self, init: impl FnOnce() -> T) -> &T { + if !self.once.is_completed() { + self.do_init(init); + } + + // Safety: The `std::sync::Once` guarantees that we can only reach this + // line if a `call_once` closure has been run exactly once and without + // panicking. Thus, the value is not uninitialized. + // + // There is also no race because the only `&self` method that modifies + // `value` is `do_init`, but if the `call_once` closure is still + // running, then no thread has gotten past the `call_once`. + unsafe { &*(self.value.get() as *const T) } + } + + #[cold] + fn do_init(&self, init: impl FnOnce() -> T) { + let value_ptr = self.value.get() as *mut T; + + self.once.call_once(|| { + let set_to = init(); + + // Safety: The `std::sync::Once` guarantees that this initialization + // will run at most once, and that no thread can get past the + // `call_once` until it has run exactly once. Thus, we have + // exclusive access to `value`. + unsafe { + std::ptr::write(value_ptr, set_to); + } + }); + } +} + +impl<T> Drop for OnceCell<T> { + fn drop(&mut self) { + if self.once.is_completed() { + let value_ptr = self.value.get() as *mut T; + unsafe { + std::ptr::drop_in_place(value_ptr); + } + } + } +} diff --git a/vendor/tokio/src/util/pad.rs b/vendor/tokio/src/util/pad.rs deleted file mode 100644 index bf0913ca8..000000000 --- a/vendor/tokio/src/util/pad.rs +++ /dev/null @@ -1,52 +0,0 @@ -use core::fmt; -use core::ops::{Deref, DerefMut}; - -#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] -// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache -// lines at a time, so we have to align to 128 bytes rather than 64. -// -// Sources: -// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf -// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 -#[cfg_attr(target_arch = "x86_64", repr(align(128)))] -#[cfg_attr(not(target_arch = "x86_64"), repr(align(64)))] -pub(crate) struct CachePadded<T> { - value: T, -} - -unsafe impl<T: Send> Send for CachePadded<T> {} -unsafe impl<T: Sync> Sync for CachePadded<T> {} - -impl<T> CachePadded<T> { - pub(crate) fn new(t: T) -> CachePadded<T> { - CachePadded::<T> { value: t } - } -} - -impl<T> Deref for CachePadded<T> { - type Target = T; - - fn deref(&self) -> &T { - &self.value - } -} - -impl<T> DerefMut for CachePadded<T> { - fn deref_mut(&mut self) -> &mut T { - &mut self.value - } -} - -impl<T: fmt::Debug> fmt::Debug for CachePadded<T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CachePadded") - .field("value", &self.value) - .finish() - } -} - -impl<T> From<T> for CachePadded<T> { - fn from(t: T) -> Self { - CachePadded::new(t) - } -} diff --git a/vendor/tokio/src/util/rand.rs b/vendor/tokio/src/util/rand.rs index 17b3ec1ae..d96c8d37e 100644 --- a/vendor/tokio/src/util/rand.rs +++ b/vendor/tokio/src/util/rand.rs @@ -1,21 +1,43 @@ -use std::cell::Cell; +cfg_rt! { + mod rt; + pub(crate) use rt::RngSeedGenerator; -/// Fast random number generate + cfg_unstable! { + mod rt_unstable; + } +} + +/// A seed for random number generation. +/// +/// In order to make certain functions within a runtime deterministic, a seed +/// can be specified at the time of creation. +#[allow(unreachable_pub)] +#[derive(Clone, Debug)] +pub struct RngSeed { + s: u32, + r: u32, +} + +/// Fast random number generate. /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> /// This generator passes the SmallCrush suite, part of TestU01 framework: /// <http://simul.iro.umontreal.ca/testu01/tu01.html> -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] pub(crate) struct FastRand { - one: Cell<u32>, - two: Cell<u32>, + one: u32, + two: u32, } -impl FastRand { - /// Initialize a new, thread-local, fast random number generator. - pub(crate) fn new(seed: u64) -> FastRand { +impl RngSeed { + /// Creates a random seed using loom internally. + pub(crate) fn new() -> Self { + Self::from_u64(crate::loom::rand::seed()) + } + + fn from_u64(seed: u64) -> Self { let one = (seed >> 32) as u32; let mut two = seed as u32; @@ -24,41 +46,50 @@ impl FastRand { two = 1; } + Self::from_pair(one, two) + } + + fn from_pair(s: u32, r: u32) -> Self { + Self { s, r } + } +} + +impl FastRand { + /// Initialize a new fast random number generator using the default source of entropy. + pub(crate) fn new() -> FastRand { + FastRand::from_seed(RngSeed::new()) + } + + /// Initializes a new, thread-local, fast random number generator. + pub(crate) fn from_seed(seed: RngSeed) -> FastRand { FastRand { - one: Cell::new(one), - two: Cell::new(two), + one: seed.s, + two: seed.r, } } - pub(crate) fn fastrand_n(&self, n: u32) -> u32 { + #[cfg(any( + feature = "macros", + feature = "rt-multi-thread", + all(feature = "sync", feature = "rt") + ))] + pub(crate) fn fastrand_n(&mut self, n: u32) -> u32 { // This is similar to fastrand() % n, but faster. // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ let mul = (self.fastrand() as u64).wrapping_mul(n as u64); (mul >> 32) as u32 } - fn fastrand(&self) -> u32 { - let mut s1 = self.one.get(); - let s0 = self.two.get(); + fn fastrand(&mut self) -> u32 { + let mut s1 = self.one; + let s0 = self.two; s1 ^= s1 << 17; s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; - self.one.set(s0); - self.two.set(s1); + self.one = s0; + self.two = s1; s0.wrapping_add(s1) } } - -// Used by the select macro and `StreamMap` -#[cfg(any(feature = "macros"))] -#[doc(hidden)] -#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub fn thread_rng_n(n: u32) -> u32 { - thread_local! { - static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); - } - - THREAD_RNG.with(|rng| rng.fastrand_n(n)) -} diff --git a/vendor/tokio/src/util/rand/rt.rs b/vendor/tokio/src/util/rand/rt.rs new file mode 100644 index 000000000..6584ba4f2 --- /dev/null +++ b/vendor/tokio/src/util/rand/rt.rs @@ -0,0 +1,61 @@ +use super::{FastRand, RngSeed}; + +use std::sync::Mutex; + +/// A deterministic generator for seeds (and other generators). +/// +/// Given the same initial seed, the generator will output the same sequence of seeds. +/// +/// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` +/// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a +/// thread local store, the expectation is that seed generation will not need to happen +/// very frequently, so the cost of the mutex should be minimal. +#[derive(Debug)] +pub(crate) struct RngSeedGenerator { + /// Internal state for the seed generator. We keep it in a Mutex so that we can safely + /// use it across multiple threads. + state: Mutex<FastRand>, +} + +impl RngSeedGenerator { + /// Returns a new generator from the provided seed. + pub(crate) fn new(seed: RngSeed) -> Self { + Self { + state: Mutex::new(FastRand::from_seed(seed)), + } + } + + /// Returns the next seed in the sequence. + pub(crate) fn next_seed(&self) -> RngSeed { + let mut rng = self + .state + .lock() + .expect("RNG seed generator is internally corrupt"); + + let s = rng.fastrand(); + let r = rng.fastrand(); + + RngSeed::from_pair(s, r) + } + + /// Directly creates a generator using the next seed. + pub(crate) fn next_generator(&self) -> Self { + RngSeedGenerator::new(self.next_seed()) + } +} + +impl FastRand { + /// Replaces the state of the random number generator with the provided seed, returning + /// the seed that represents the previous state of the random number generator. + /// + /// The random number generator will become equivalent to one created with + /// the same seed. + pub(crate) fn replace_seed(&mut self, seed: RngSeed) -> RngSeed { + let old_seed = RngSeed::from_pair(self.one, self.two); + + self.one = seed.s; + self.two = seed.r; + + old_seed + } +} diff --git a/vendor/tokio/src/util/rand/rt_unstable.rs b/vendor/tokio/src/util/rand/rt_unstable.rs new file mode 100644 index 000000000..3a8613eb0 --- /dev/null +++ b/vendor/tokio/src/util/rand/rt_unstable.rs @@ -0,0 +1,20 @@ +use super::RngSeed; + +use std::collections::hash_map::DefaultHasher; +use std::hash::Hasher; + +impl RngSeed { + /// Generates a seed from the provided byte slice. + /// + /// # Example + /// + /// ``` + /// # use tokio::runtime::RngSeed; + /// let seed = RngSeed::from_bytes(b"make me a seed"); + /// ``` + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut hasher = DefaultHasher::default(); + hasher.write(bytes); + Self::from_u64(hasher.finish()) + } +} diff --git a/vendor/tokio/src/util/rc_cell.rs b/vendor/tokio/src/util/rc_cell.rs new file mode 100644 index 000000000..447249268 --- /dev/null +++ b/vendor/tokio/src/util/rc_cell.rs @@ -0,0 +1,57 @@ +use crate::loom::cell::UnsafeCell; + +use std::rc::Rc; + +/// This is exactly like `Cell<Option<Rc<T>>>`, except that it provides a `get` +/// method even though `Rc` is not `Copy`. +pub(crate) struct RcCell<T> { + inner: UnsafeCell<Option<Rc<T>>>, +} + +impl<T> RcCell<T> { + #[cfg(not(all(loom, test)))] + pub(crate) const fn new() -> Self { + Self { + inner: UnsafeCell::new(None), + } + } + + // The UnsafeCell in loom does not have a const `new` fn. + #[cfg(all(loom, test))] + pub(crate) fn new() -> Self { + Self { + inner: UnsafeCell::new(None), + } + } + + /// Safety: This method may not be called recursively. + #[inline] + unsafe fn with_inner<F, R>(&self, f: F) -> R + where + F: FnOnce(&mut Option<Rc<T>>) -> R, + { + // safety: This type is not Sync, so concurrent calls of this method + // cannot happen. Furthermore, the caller guarantees that the method is + // not called recursively. Finally, this is the only place that can + // create mutable references to the inner Rc. This ensures that any + // mutable references created here are exclusive. + self.inner.with_mut(|ptr| f(&mut *ptr)) + } + + pub(crate) fn get(&self) -> Option<Rc<T>> { + // safety: The `Rc::clone` method will not call any unknown user-code, + // so it will not result in a recursive call to `with_inner`. + unsafe { self.with_inner(|rc| rc.clone()) } + } + + pub(crate) fn replace(&self, val: Option<Rc<T>>) -> Option<Rc<T>> { + // safety: No destructors or other unknown user-code will run inside the + // `with_inner` call, so no recursive call to `with_inner` can happen. + unsafe { self.with_inner(|rc| std::mem::replace(rc, val)) } + } + + pub(crate) fn set(&self, val: Option<Rc<T>>) { + let old = self.replace(val); + drop(old); + } +} diff --git a/vendor/tokio/src/util/slab.rs b/vendor/tokio/src/util/slab.rs index 2ddaa6c74..cc1208ebe 100644 --- a/vendor/tokio/src/util/slab.rs +++ b/vendor/tokio/src/util/slab.rs @@ -85,11 +85,11 @@ pub(crate) struct Address(usize); /// An entry in the slab. pub(crate) trait Entry: Default { - /// Reset the entry's value and track the generation. + /// Resets the entry's value and track the generation. fn reset(&self); } -/// A reference to a value stored in the slab +/// A reference to a value stored in the slab. pub(crate) struct Ref<T> { value: *const Value<T>, } @@ -101,9 +101,9 @@ const NUM_PAGES: usize = 19; const PAGE_INITIAL_SIZE: usize = 32; const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1; -/// A page in the slab +/// A page in the slab. struct Page<T> { - /// Slots + /// Slots. slots: Mutex<Slots<T>>, // Number of slots currently being used. This is not guaranteed to be up to @@ -116,7 +116,7 @@ struct Page<T> { // The number of slots the page can hold. len: usize, - // Length of all previous pages combined + // Length of all previous pages combined. prev_len: usize, } @@ -128,9 +128,9 @@ struct CachedPage<T> { init: usize, } -/// Page state +/// Page state. struct Slots<T> { - /// Slots + /// Slots. slots: Vec<Slot<T>>, head: usize, @@ -157,11 +157,17 @@ struct Slot<T> { /// Next entry in the free list. next: u32, + + /// Makes miri happy by making mutable references not take exclusive access. + /// + /// Could probably also be fixed by replacing `slots` with a raw-pointer + /// based equivalent. + _pin: std::marker::PhantomPinned, } -/// Value paired with a reference to the page +/// Value paired with a reference to the page. struct Value<T> { - /// Value stored in the value + /// Value stored in the value. value: T, /// Pointer to the page containing the slot. @@ -171,7 +177,7 @@ struct Value<T> { } impl<T> Slab<T> { - /// Create a new, empty, slab + /// Create a new, empty, slab. pub(crate) fn new() -> Slab<T> { // Initializing arrays is a bit annoying. Instead of manually writing // out an array and every single entry, `Default::default()` is used to @@ -409,7 +415,7 @@ impl<T: Entry> Page<T> { slot.value.with(|ptr| unsafe { (*ptr).value.reset() }); // Return a reference to the slot - Some((me.addr(idx), slot.gen_ref(me))) + Some((me.addr(idx), locked.gen_ref(idx, me))) } else if me.len == locked.slots.len() { // The page is full None @@ -428,9 +434,10 @@ impl<T: Entry> Page<T> { locked.slots.push(Slot { value: UnsafeCell::new(Value { value: Default::default(), - page: &**me as *const _, + page: Arc::as_ptr(me), }), next: 0, + _pin: std::marker::PhantomPinned, }); // Increment the head to indicate the free stack is empty @@ -443,7 +450,7 @@ impl<T: Entry> Page<T> { debug_assert_eq!(locked.slots.len(), locked.head); - Some((me.addr(idx), locked.slots[idx].gen_ref(me))) + Some((me.addr(idx), locked.gen_ref(idx, me))) } } } @@ -455,7 +462,7 @@ impl<T> Page<T> { addr.0 - self.prev_len } - /// Returns the address for the given slot + /// Returns the address for the given slot. fn addr(&self, slot: usize) -> Address { Address(slot + self.prev_len) } @@ -478,7 +485,7 @@ impl<T> Default for Page<T> { } impl<T> Page<T> { - /// Release a slot into the page's free list + /// Release a slot into the page's free list. fn release(&self, value: *const Value<T>) { let mut locked = self.slots.lock(); @@ -492,7 +499,7 @@ impl<T> Page<T> { } impl<T> CachedPage<T> { - /// Refresh the cache + /// Refreshes the cache. fn refresh(&mut self, page: &Page<T>) { let slots = page.slots.lock(); @@ -502,7 +509,7 @@ impl<T> CachedPage<T> { } } - // Get a value by index + /// Gets a value by index. fn get(&self, idx: usize) -> &T { assert!(idx < self.init); @@ -544,39 +551,35 @@ impl<T> Slots<T> { fn index_for(&self, slot: *const Value<T>) -> usize { use std::mem; - let base = &self.slots[0] as *const _ as usize; - - assert!(base != 0, "page is unallocated"); + assert_ne!(self.slots.capacity(), 0, "page is unallocated"); + let base = self.slots.as_ptr() as usize; let slot = slot as usize; let width = mem::size_of::<Slot<T>>(); assert!(slot >= base, "unexpected pointer"); let idx = (slot - base) / width; - assert!(idx < self.slots.len() as usize); + assert!(idx < self.slots.len()); idx } -} -impl<T: Entry> Slot<T> { - /// Generates a `Ref` for the slot. This involves bumping the page's ref count. - fn gen_ref(&self, page: &Arc<Page<T>>) -> Ref<T> { - // The ref holds a ref on the page. The `Arc` is forgotten here and is - // resurrected in `release` when the `Ref` is dropped. By avoiding to - // hold on to an explicit `Arc` value, the struct size of `Ref` is - // reduced. + /// Generates a `Ref` for the slot at the given index. This involves bumping the page's ref count. + fn gen_ref(&self, idx: usize, page: &Arc<Page<T>>) -> Ref<T> { + assert!(idx < self.slots.len()); mem::forget(page.clone()); - let slot = self as *const Slot<T>; - let value = slot as *const Value<T>; + + let vec_ptr = self.slots.as_ptr(); + let slot: *const Slot<T> = unsafe { vec_ptr.add(idx) }; + let value: *const Value<T> = slot as *const Value<T>; Ref { value } } } impl<T> Value<T> { - // Release the slot, returning the `Arc<Page<T>>` logically owned by the ref. + /// Releases the slot, returning the `Arc<Page<T>>` logically owned by the ref. fn release(&self) -> Arc<Page<T>> { // Safety: called by `Ref`, which owns an `Arc<Page<T>>` instance. let page = unsafe { Arc::from_raw(self.page) }; @@ -691,11 +694,13 @@ mod test { #[test] fn insert_many() { + const MANY: usize = normal_or_miri(10_000, 50); + let mut slab = Slab::<Foo>::new(); let alloc = slab.allocator(); let mut entries = vec![]; - for i in 0..10_000 { + for i in 0..MANY { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); entries.push((addr, val)); @@ -708,15 +713,15 @@ mod test { entries.clear(); - for i in 0..10_000 { + for i in 0..MANY { let (addr, val) = alloc.allocate().unwrap(); - val.id.store(10_000 - i, SeqCst); + val.id.store(MANY - i, SeqCst); entries.push((addr, val)); } for (i, (addr, v)) in entries.iter().enumerate() { - assert_eq!(10_000 - i, v.id.load(SeqCst)); - assert_eq!(10_000 - i, slab.get(*addr).unwrap().id.load(SeqCst)); + assert_eq!(MANY - i, v.id.load(SeqCst)); + assert_eq!(MANY - i, slab.get(*addr).unwrap().id.load(SeqCst)); } } @@ -726,7 +731,7 @@ mod test { let alloc = slab.allocator(); let mut entries = vec![]; - for i in 0..10_000 { + for i in 0..normal_or_miri(10_000, 100) { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); entries.push((addr, val)); @@ -734,7 +739,7 @@ mod test { for _ in 0..10 { // Drop 1000 in reverse - for _ in 0..1_000 { + for _ in 0..normal_or_miri(1_000, 10) { entries.pop(); } @@ -753,7 +758,7 @@ mod test { let mut entries1 = vec![]; let mut entries2 = vec![]; - for i in 0..10_000 { + for i in 0..normal_or_miri(10_000, 100) { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); @@ -771,6 +776,14 @@ mod test { } } + const fn normal_or_miri(normal: usize, miri: usize) -> usize { + if cfg!(miri) { + miri + } else { + normal + } + } + #[test] fn compact_all() { let mut slab = Slab::<Foo>::new(); @@ -780,7 +793,7 @@ mod test { for _ in 0..2 { entries.clear(); - for i in 0..10_000 { + for i in 0..normal_or_miri(10_000, 100) { let (addr, val) = alloc.allocate().unwrap(); val.id.store(i, SeqCst); @@ -808,7 +821,7 @@ mod test { let alloc = slab.allocator(); let mut entries = vec![]; - for _ in 0..5 { + for _ in 0..normal_or_miri(5, 2) { entries.clear(); // Allocate a few pages + 1 diff --git a/vendor/tokio/src/util/sync_wrapper.rs b/vendor/tokio/src/util/sync_wrapper.rs new file mode 100644 index 000000000..5ffc8f96b --- /dev/null +++ b/vendor/tokio/src/util/sync_wrapper.rs @@ -0,0 +1,26 @@ +//! This module contains a type that can make `Send + !Sync` types `Sync` by +//! disallowing all immutable access to the value. +//! +//! A similar primitive is provided in the `sync_wrapper` crate. + +pub(crate) struct SyncWrapper<T> { + value: T, +} + +// safety: The SyncWrapper being send allows you to send the inner value across +// thread boundaries. +unsafe impl<T: Send> Send for SyncWrapper<T> {} + +// safety: An immutable reference to a SyncWrapper is useless, so moving such an +// immutable reference across threads is safe. +unsafe impl<T> Sync for SyncWrapper<T> {} + +impl<T> SyncWrapper<T> { + pub(crate) fn new(value: T) -> Self { + Self { value } + } + + pub(crate) fn into_inner(self) -> T { + self.value + } +} diff --git a/vendor/tokio/src/util/trace.rs b/vendor/tokio/src/util/trace.rs index c51a5a72b..76e8a6cbf 100644 --- a/vendor/tokio/src/util/trace.rs +++ b/vendor/tokio/src/util/trace.rs @@ -1,37 +1,98 @@ cfg_trace! { cfg_rt! { + use core::{ + pin::Pin, + task::{Context, Poll}, + }; + use pin_project_lite::pin_project; + use std::future::Future; pub(crate) use tracing::instrument::Instrumented; #[inline] - #[cfg_attr(tokio_track_caller, track_caller)] - pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>) -> Instrumented<F> { + #[track_caller] + pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>, id: u64) -> Instrumented<F> { use tracing::instrument::Instrument; - #[cfg(tokio_track_caller)] let location = std::panic::Location::caller(); - #[cfg(tokio_track_caller)] let span = tracing::trace_span!( target: "tokio::task", - "task", + "runtime.spawn", %kind, - spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), - task.name = %name.unwrap_or_default() - ); - #[cfg(not(tokio_track_caller))] - let span = tracing::trace_span!( - target: "tokio::task", - "task", - %kind, - task.name = %name.unwrap_or_default() + task.name = %name.unwrap_or_default(), + task.id = id, + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), ); task.instrument(span) } + + pub(crate) fn async_op<P,F>(inner: P, resource_span: tracing::Span, source: &str, poll_op_name: &'static str, inherits_child_attrs: bool) -> InstrumentedAsyncOp<F> + where P: FnOnce() -> F { + resource_span.in_scope(|| { + let async_op_span = tracing::trace_span!("runtime.resource.async_op", source = source, inherits_child_attrs = inherits_child_attrs); + let enter = async_op_span.enter(); + let async_op_poll_span = tracing::trace_span!("runtime.resource.async_op.poll"); + let inner = inner(); + drop(enter); + let tracing_ctx = AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span: resource_span.clone(), + }; + InstrumentedAsyncOp { + inner, + tracing_ctx, + poll_op_name, + } + }) + } + + #[derive(Debug, Clone)] + pub(crate) struct AsyncOpTracingCtx { + pub(crate) async_op_span: tracing::Span, + pub(crate) async_op_poll_span: tracing::Span, + pub(crate) resource_span: tracing::Span, + } + + + pin_project! { + #[derive(Debug, Clone)] + pub(crate) struct InstrumentedAsyncOp<F> { + #[pin] + pub(crate) inner: F, + pub(crate) tracing_ctx: AsyncOpTracingCtx, + pub(crate) poll_op_name: &'static str + } + } + + impl<F: Future> Future for InstrumentedAsyncOp<F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.project(); + let poll_op_name = &*this.poll_op_name; + let _res_enter = this.tracing_ctx.resource_span.enter(); + let _async_op_enter = this.tracing_ctx.async_op_span.enter(); + let _async_op_poll_enter = this.tracing_ctx.async_op_poll_span.enter(); + trace_poll_op!(poll_op_name, this.inner.poll(cx)) + } + } + } +} +cfg_time! { + #[track_caller] + pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + return Some(std::panic::Location::caller()); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + None } } cfg_not_trace! { cfg_rt! { #[inline] - pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>) -> F { + pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>, _: u64) -> F { // nop task } diff --git a/vendor/tokio/src/util/wake.rs b/vendor/tokio/src/util/wake.rs index 57739371d..5526cbc63 100644 --- a/vendor/tokio/src/util/wake.rs +++ b/vendor/tokio/src/util/wake.rs @@ -1,15 +1,16 @@ +use crate::loom::sync::Arc; + use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::ops::Deref; -use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable, Waker}; -/// Simplified waking interface based on Arcs -pub(crate) trait Wake: Send + Sync { - /// Wake by value - fn wake(self: Arc<Self>); +/// Simplified waking interface based on Arcs. +pub(crate) trait Wake: Send + Sync + Sized + 'static { + /// Wake by value. + fn wake(arc_self: Arc<Self>); - /// Wake by reference + /// Wake by reference. fn wake_by_ref(arc_self: &Arc<Self>); } @@ -30,7 +31,7 @@ impl Deref for WakerRef<'_> { /// Creates a reference to a `Waker` from a reference to `Arc<impl Wake>`. pub(crate) fn waker_ref<W: Wake>(wake: &Arc<W>) -> WakerRef<'_> { - let ptr = &**wake as *const _ as *const (); + let ptr = Arc::as_ptr(wake) as *const (); let waker = unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) }; diff --git a/vendor/tokio/src/util/wake_list.rs b/vendor/tokio/src/util/wake_list.rs new file mode 100644 index 000000000..aa569dd17 --- /dev/null +++ b/vendor/tokio/src/util/wake_list.rs @@ -0,0 +1,53 @@ +use core::mem::MaybeUninit; +use core::ptr; +use std::task::Waker; + +const NUM_WAKERS: usize = 32; + +pub(crate) struct WakeList { + inner: [MaybeUninit<Waker>; NUM_WAKERS], + curr: usize, +} + +impl WakeList { + pub(crate) fn new() -> Self { + Self { + inner: unsafe { + // safety: Create an uninitialized array of `MaybeUninit`. The + // `assume_init` is safe because the type we are claiming to + // have initialized here is a bunch of `MaybeUninit`s, which do + // not require initialization. + MaybeUninit::uninit().assume_init() + }, + curr: 0, + } + } + + #[inline] + pub(crate) fn can_push(&self) -> bool { + self.curr < NUM_WAKERS + } + + pub(crate) fn push(&mut self, val: Waker) { + debug_assert!(self.can_push()); + + self.inner[self.curr] = MaybeUninit::new(val); + self.curr += 1; + } + + pub(crate) fn wake_all(&mut self) { + assert!(self.curr <= NUM_WAKERS); + while self.curr > 0 { + self.curr -= 1; + let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) }; + waker.wake(); + } + } +} + +impl Drop for WakeList { + fn drop(&mut self) { + let slice = ptr::slice_from_raw_parts_mut(self.inner.as_mut_ptr() as *mut Waker, self.curr); + unsafe { ptr::drop_in_place(slice) }; + } +} |