//! This module defines an `IdleNotifiedSet`, which is a collection of elements. //! Each element is intended to correspond to a task, and the collection will //! keep track of which tasks have had their waker notified, and which have not. //! //! Each entry in the set holds some user-specified value. The value's type is //! specified using the `T` parameter. It will usually be a `JoinHandle` or //! similar. use std::marker::PhantomPinned; use std::mem::ManuallyDrop; use std::ptr::NonNull; use std::task::{Context, Waker}; use crate::loom::cell::UnsafeCell; use crate::loom::sync::{Arc, Mutex}; use crate::util::linked_list::{self, Link}; use crate::util::{waker_ref, Wake}; type LinkedList = linked_list::LinkedList, as linked_list::Link>::Target>; /// This is the main handle to the collection. pub(crate) struct IdleNotifiedSet { lists: Arc>, length: usize, } /// A handle to an entry that is guaranteed to be stored in the idle or notified /// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet` /// mutably to prevent the entry from being moved to the `Neither` list, which /// only the `IdleNotifiedSet` may do. /// /// The main consequence of being stored in one of the lists is that the `value` /// field has not yet been consumed. /// /// Note: This entry can be moved from the idle to the notified list while this /// object exists by waking its waker. pub(crate) struct EntryInOneOfTheLists<'a, T> { entry: Arc>, set: &'a mut IdleNotifiedSet, } type Lists = Mutex>; /// The linked lists hold strong references to the ListEntry items, and the /// ListEntry items also hold a strong reference back to the Lists object, but /// the destructor of the `IdleNotifiedSet` will clear the two lists, so once /// that object is destroyed, no ref-cycles will remain. struct ListsInner { notified: LinkedList, idle: LinkedList, /// Whenever an element in the `notified` list is woken, this waker will be /// notified and consumed, if it exists. waker: Option, } /// Which of the two lists in the shared Lists object is this entry stored in? /// /// If the value is `Idle`, then an entry's waker may move it to the notified /// list. Otherwise, only the `IdleNotifiedSet` may move it. /// /// If the value is `Neither`, then it is still possible that the entry is in /// some third external list (this happens in `drain`). #[derive(Copy, Clone, Eq, PartialEq)] enum List { Notified, Idle, Neither, } /// An entry in the list. /// /// # Safety /// /// The `my_list` field must only be accessed while holding the mutex in /// `parent`. It is an invariant that the value of `my_list` corresponds to /// which linked list in the `parent` holds this entry. Once this field takes /// the value `Neither`, then it may never be modified again. /// /// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field /// must only be accessed while holding the mutex. If the value of `my_list` is /// `Neither`, then the `pointers` field may be accessed by the /// `IdleNotifiedSet` (this happens inside `drain`). /// /// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed /// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to /// `Neither` assumes ownership of the `value`, and it must either drop it or /// move it out from this entry to prevent it from getting leaked. (Since the /// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the /// value should not be leaked.) struct ListEntry { /// The linked list pointers of the list this entry is in. pointers: linked_list::Pointers>, /// Pointer to the shared `Lists` struct. parent: Arc>, /// The value stored in this entry. value: UnsafeCell>, /// Used to remember which list this entry is in. my_list: UnsafeCell, /// Required by the `linked_list::Pointers` field. _pin: PhantomPinned, } generate_addr_of_methods! { impl ListEntry { unsafe fn addr_of_pointers(self: NonNull) -> NonNull>> { &self.pointers } } } // With mutable access to the `IdleNotifiedSet`, you can get mutable access to // the values. unsafe impl Send for IdleNotifiedSet {} // With the current API we strictly speaking don't even need `T: Sync`, but we // require it anyway to support adding &self APIs that access the values in the // future. unsafe impl Sync for IdleNotifiedSet {} // These impls control when it is safe to create a Waker. Since the waker does // not allow access to the value in any way (including its destructor), it is // not necessary for `T` to be Send or Sync. unsafe impl Send for ListEntry {} unsafe impl Sync for ListEntry {} impl IdleNotifiedSet { /// Create a new IdleNotifiedSet. pub(crate) fn new() -> Self { let lists = Mutex::new(ListsInner { notified: LinkedList::new(), idle: LinkedList::new(), waker: None, }); IdleNotifiedSet { lists: Arc::new(lists), length: 0, } } pub(crate) fn len(&self) -> usize { self.length } pub(crate) fn is_empty(&self) -> bool { self.length == 0 } /// Insert the given value into the `idle` list. pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> { self.length += 1; let entry = Arc::new(ListEntry { parent: self.lists.clone(), value: UnsafeCell::new(ManuallyDrop::new(value)), my_list: UnsafeCell::new(List::Idle), pointers: linked_list::Pointers::new(), _pin: PhantomPinned, }); { let mut lock = self.lists.lock(); lock.idle.push_front(entry.clone()); } // Safety: We just put the entry in the idle list, so it is in one of the lists. EntryInOneOfTheLists { entry, set: self } } /// Pop an entry from the notified list to poll it. The entry is moved to /// the idle list atomically. pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option> { // We don't decrement the length because this call moves the entry to // the idle list rather than removing it. if self.length == 0 { // Fast path. return None; } let mut lock = self.lists.lock(); let should_update_waker = match lock.waker.as_mut() { Some(cur_waker) => !waker.will_wake(cur_waker), None => true, }; if should_update_waker { lock.waker = Some(waker.clone()); } // Pop the entry, returning None if empty. let entry = lock.notified.pop_back()?; lock.idle.push_front(entry.clone()); // Safety: We are holding the lock. entry.my_list.with_mut(|ptr| unsafe { *ptr = List::Idle; }); drop(lock); // Safety: We just put the entry in the idle list, so it is in one of the lists. Some(EntryInOneOfTheLists { entry, set: self }) } /// Call a function on every element in this list. pub(crate) fn for_each(&mut self, mut func: F) { fn get_ptrs(list: &mut LinkedList, ptrs: &mut Vec<*mut T>) { let mut node = list.last(); while let Some(entry) = node { ptrs.push(entry.value.with_mut(|ptr| { let ptr: *mut ManuallyDrop = ptr; let ptr: *mut T = ptr.cast(); ptr })); let prev = entry.pointers.get_prev(); node = prev.map(|prev| unsafe { &*prev.as_ptr() }); } } // Atomically get a raw pointer to the value of every entry. // // Since this only locks the mutex once, it is not possible for a value // to get moved from the idle list to the notified list during the // operation, which would otherwise result in some value being listed // twice. let mut ptrs = Vec::with_capacity(self.len()); { let mut lock = self.lists.lock(); get_ptrs(&mut lock.idle, &mut ptrs); get_ptrs(&mut lock.notified, &mut ptrs); } debug_assert_eq!(ptrs.len(), ptrs.capacity()); for ptr in ptrs { // Safety: When we grabbed the pointers, the entries were in one of // the two lists. This means that their value was valid at the time, // and it must still be valid because we are the IdleNotifiedSet, // and only we can remove an entry from the two lists. (It's // possible that an entry is moved from one list to the other during // this loop, but that is ok.) func(unsafe { &mut *ptr }); } } /// Remove all entries in both lists, applying some function to each element. /// /// The closure is called on all elements even if it panics. Having it panic /// twice is a double-panic, and will abort the application. pub(crate) fn drain(&mut self, func: F) { if self.length == 0 { // Fast path. return; } self.length = 0; // The LinkedList is not cleared on panic, so we use a bomb to clear it. // // This value has the invariant that any entry in its `all_entries` list // has `my_list` set to `Neither` and that the value has not yet been // dropped. struct AllEntries { all_entries: LinkedList, func: F, } impl AllEntries { fn pop_next(&mut self) -> bool { if let Some(entry) = self.all_entries.pop_back() { // Safety: We just took this value from the list, so we can // destroy the value in the entry. entry .value .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) }); true } else { false } } } impl Drop for AllEntries { fn drop(&mut self) { while self.pop_next() {} } } let mut all_entries = AllEntries { all_entries: LinkedList::new(), func, }; // Atomically move all entries to the new linked list in the AllEntries // object. { let mut lock = self.lists.lock(); unsafe { // Safety: We are holding the lock and `all_entries` is a new // LinkedList. move_to_new_list(&mut lock.idle, &mut all_entries.all_entries); move_to_new_list(&mut lock.notified, &mut all_entries.all_entries); } } // Keep destroying entries in the list until it is empty. // // If the closure panics, then the destructor of the `AllEntries` bomb // ensures that we keep running the destructor on the remaining values. // A second panic will abort the program. while all_entries.pop_next() {} } } /// # Safety /// /// The mutex for the entries must be held, and the target list must be such /// that setting `my_list` to `Neither` is ok. unsafe fn move_to_new_list(from: &mut LinkedList, to: &mut LinkedList) { while let Some(entry) = from.pop_back() { entry.my_list.with_mut(|ptr| { *ptr = List::Neither; }); to.push_front(entry); } } impl<'a, T> EntryInOneOfTheLists<'a, T> { /// Remove this entry from the list it is in, returning the value associated /// with the entry. /// /// This consumes the value, since it is no longer guaranteed to be in a /// list. pub(crate) fn remove(self) -> T { self.set.length -= 1; { let mut lock = self.set.lists.lock(); // Safety: We are holding the lock so there is no race, and we will // remove the entry afterwards to uphold invariants. let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe { let old_my_list = *ptr; *ptr = List::Neither; old_my_list }); let list = match old_my_list { List::Idle => &mut lock.idle, List::Notified => &mut lock.notified, // An entry in one of the lists is in one of the lists. List::Neither => unreachable!(), }; unsafe { // Safety: We just checked that the entry is in this particular // list. list.remove(ListEntry::as_raw(&self.entry)).unwrap(); } } // By setting `my_list` to `Neither`, we have taken ownership of the // value. We return it to the caller. // // Safety: We have a mutable reference to the `IdleNotifiedSet` that // owns this entry, so we can use its permission to access the value. self.entry .value .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) }) } /// Access the value in this entry together with a context for its waker. pub(crate) fn with_value_and_context(&mut self, func: F) -> U where F: FnOnce(&mut T, &mut Context<'_>) -> U, T: 'static, { let waker = waker_ref(&self.entry); let mut context = Context::from_waker(&waker); // Safety: We have a mutable reference to the `IdleNotifiedSet` that // owns this entry, so we can use its permission to access the value. self.entry .value .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) }) } } impl Drop for IdleNotifiedSet { fn drop(&mut self) { // Clear both lists. self.drain(drop); #[cfg(debug_assertions)] if !std::thread::panicking() { let lock = self.lists.lock(); assert!(lock.idle.is_empty()); assert!(lock.notified.is_empty()); } } } impl Wake for ListEntry { fn wake_by_ref(me: &Arc) { let mut lock = me.parent.lock(); // Safety: We are holding the lock and we will update the lists to // maintain invariants. let old_my_list = me.my_list.with_mut(|ptr| unsafe { let old_my_list = *ptr; if old_my_list == List::Idle { *ptr = List::Notified; } old_my_list }); if old_my_list == List::Idle { // We move ourself to the notified list. let me = unsafe { // Safety: We just checked that we are in this particular list. lock.idle.remove(ListEntry::as_raw(me)).unwrap() }; lock.notified.push_front(me); if let Some(waker) = lock.waker.take() { drop(lock); waker.wake(); } } } fn wake(me: Arc) { Self::wake_by_ref(&me) } } /// # Safety /// /// `ListEntry` is forced to be !Unpin. unsafe impl linked_list::Link for ListEntry { type Handle = Arc>; type Target = ListEntry; fn as_raw(handle: &Self::Handle) -> NonNull> { let ptr: *const ListEntry = Arc::as_ptr(handle); // Safety: We can't get a null pointer from `Arc::as_ptr`. unsafe { NonNull::new_unchecked(ptr as *mut ListEntry) } } unsafe fn from_raw(ptr: NonNull>) -> Arc> { Arc::from_raw(ptr.as_ptr()) } unsafe fn pointers( target: NonNull>, ) -> NonNull>> { ListEntry::addr_of_pointers(target) } } #[cfg(all(test, not(loom)))] mod tests { use crate::runtime::Builder; use crate::task::JoinSet; // A test that runs under miri. // // https://github.com/tokio-rs/tokio/pull/5693 #[test] fn join_set_test() { let rt = Builder::new_current_thread().build().unwrap(); let mut set = JoinSet::new(); set.spawn_on(futures::future::ready(()), rt.handle()); rt.block_on(set.join_next()).unwrap().unwrap(); } }