diff options
Diffstat (limited to 'third_party/rust/tokio/src/sync/notify.rs')
-rw-r--r-- | third_party/rust/tokio/src/sync/notify.rs | 740 |
1 files changed, 740 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/notify.rs b/third_party/rust/tokio/src/sync/notify.rs new file mode 100644 index 0000000000..83d0de4fbe --- /dev/null +++ b/third_party/rust/tokio/src/sync/notify.rs @@ -0,0 +1,740 @@ +// Allow `unreachable_pub` warnings when sync is not enabled +// due to the usage of `Notify` within the `rt` feature set. +// When this module is compiled with `sync` enabled we will warn on +// this lint. When `rt` is enabled we use `pub(crate)` which +// triggers this warning but it is safe to ignore in this case. +#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] + +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::util::linked_list::{self, LinkedList}; +use crate::util::WakeList; + +use std::cell::UnsafeCell; +use std::future::Future; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::atomic::Ordering::SeqCst; +use std::task::{Context, Poll, Waker}; + +type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; + +/// Notifies a single task to wake up. +/// +/// `Notify` provides a basic mechanism to notify a single task of an event. +/// `Notify` itself does not carry any data. Instead, it is to be used to signal +/// another task to perform an operation. +/// +/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. +/// [`notified().await`] waits for a permit to become available, and [`notify_one()`] +/// sets a permit **if there currently are no available permits**. +/// +/// The synchronization details of `Notify` are similar to +/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] +/// value contains a single permit. [`notified().await`] waits for the permit to +/// be made available, consumes the permit, and resumes. [`notify_one()`] sets the +/// permit, waking a pending task if there is one. +/// +/// If `notify_one()` is called **before** `notified().await`, then the next call to +/// `notified().await` will complete immediately, consuming the permit. Any +/// subsequent calls to `notified().await` will wait for a new permit. +/// +/// If `notify_one()` is called **multiple** times before `notified().await`, only a +/// **single** permit is stored. The next call to `notified().await` will +/// complete immediately, but the one after will wait for a new permit. +/// +/// # Examples +/// +/// Basic usage. +/// +/// ``` +/// use tokio::sync::Notify; +/// use std::sync::Arc; +/// +/// #[tokio::main] +/// async fn main() { +/// let notify = Arc::new(Notify::new()); +/// let notify2 = notify.clone(); +/// +/// let handle = tokio::spawn(async move { +/// notify2.notified().await; +/// println!("received notification"); +/// }); +/// +/// println!("sending notification"); +/// notify.notify_one(); +/// +/// // Wait for task to receive notification. +/// handle.await.unwrap(); +/// } +/// ``` +/// +/// Unbound mpsc channel. +/// +/// ``` +/// use tokio::sync::Notify; +/// +/// use std::collections::VecDeque; +/// use std::sync::Mutex; +/// +/// struct Channel<T> { +/// values: Mutex<VecDeque<T>>, +/// notify: Notify, +/// } +/// +/// impl<T> Channel<T> { +/// pub fn send(&self, value: T) { +/// self.values.lock().unwrap() +/// .push_back(value); +/// +/// // Notify the consumer a value is available +/// self.notify.notify_one(); +/// } +/// +/// pub async fn recv(&self) -> T { +/// loop { +/// // Drain values +/// if let Some(value) = self.values.lock().unwrap().pop_front() { +/// return value; +/// } +/// +/// // Wait for values to be available +/// self.notify.notified().await; +/// } +/// } +/// } +/// ``` +/// +/// [park]: std::thread::park +/// [unpark]: std::thread::Thread::unpark +/// [`notified().await`]: Notify::notified() +/// [`notify_one()`]: Notify::notify_one() +/// [`Semaphore`]: crate::sync::Semaphore +#[derive(Debug)] +pub struct Notify { + // This uses 2 bits to store one of `EMPTY`, + // `WAITING` or `NOTIFIED`. The rest of the bits + // are used to store the number of times `notify_waiters` + // was called. + state: AtomicUsize, + waiters: Mutex<WaitList>, +} + +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification triggered by calling `notify_waiters` + AllWaiters, + // Notification triggered by calling `notify_one` + OneWaiter, +} + +#[derive(Debug)] +#[repr(C)] // required by `linked_list::Link` impl +struct Waiter { + /// Intrusive linked-list pointers. + pointers: linked_list::Pointers<Waiter>, + + /// Waiting task's waker. + waker: Option<Waker>, + + /// `true` if the notification has been assigned to this waiter. + notified: Option<NotificationType>, + + /// Should not be `Unpin`. + _p: PhantomPinned, +} + +/// Future returned from [`Notify::notified()`] +#[derive(Debug)] +pub struct Notified<'a> { + /// The `Notify` being received on. + notify: &'a Notify, + + /// The current state of the receiving process. + state: State, + + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell<Waiter>, +} + +unsafe impl<'a> Send for Notified<'a> {} +unsafe impl<'a> Sync for Notified<'a> {} + +#[derive(Debug)] +enum State { + Init(usize), + Waiting, + Done, +} + +const NOTIFY_WAITERS_SHIFT: usize = 2; +const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1; +const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK; + +/// Initial "idle" state. +const EMPTY: usize = 0; + +/// One or more threads are currently waiting to be notified. +const WAITING: usize = 1; + +/// Pending notification. +const NOTIFIED: usize = 2; + +fn set_state(data: usize, state: usize) -> usize { + (data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK) +} + +fn get_state(data: usize) -> usize { + data & STATE_MASK +} + +fn get_num_notify_waiters_calls(data: usize) -> usize { + (data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT +} + +fn inc_num_notify_waiters_calls(data: usize) -> usize { + data + (1 << NOTIFY_WAITERS_SHIFT) +} + +fn atomic_inc_num_notify_waiters_calls(data: &AtomicUsize) { + data.fetch_add(1 << NOTIFY_WAITERS_SHIFT, SeqCst); +} + +impl Notify { + /// Create a new `Notify`, initialized without a permit. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// + /// let notify = Notify::new(); + /// ``` + pub fn new() -> Notify { + Notify { + state: AtomicUsize::new(0), + waiters: Mutex::new(LinkedList::new()), + } + } + + /// Create a new `Notify`, initialized without a permit. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// + /// static NOTIFY: Notify = Notify::const_new(); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new() -> Notify { + Notify { + state: AtomicUsize::new(0), + waiters: Mutex::const_new(LinkedList::new()), + } + } + + /// Wait for a notification. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn notified(&self); + /// ``` + /// + /// Each `Notify` value holds a single permit. If a permit is available from + /// an earlier call to [`notify_one()`], then `notified().await` will complete + /// immediately, consuming that permit. Otherwise, `notified().await` waits + /// for a permit to be made available by the next call to `notify_one()`. + /// + /// [`notify_one()`]: Notify::notify_one + /// + /// # Cancel safety + /// + /// This method uses a queue to fairly distribute notifications in the order + /// they were requested. Cancelling a call to `notified` makes you lose your + /// place in the queue. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// use std::sync::Arc; + /// + /// #[tokio::main] + /// async fn main() { + /// let notify = Arc::new(Notify::new()); + /// let notify2 = notify.clone(); + /// + /// tokio::spawn(async move { + /// notify2.notified().await; + /// println!("received notification"); + /// }); + /// + /// println!("sending notification"); + /// notify.notify_one(); + /// } + /// ``` + pub fn notified(&self) -> Notified<'_> { + // we load the number of times notify_waiters + // was called and store that in our initial state + let state = self.state.load(SeqCst); + Notified { + notify: self, + state: State::Init(state >> NOTIFY_WAITERS_SHIFT), + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + notified: None, + _p: PhantomPinned, + }), + } + } + + /// Notifies a waiting task. + /// + /// If a task is currently waiting, that task is notified. Otherwise, a + /// permit is stored in this `Notify` value and the **next** call to + /// [`notified().await`] will complete immediately consuming the permit made + /// available by this call to `notify_one()`. + /// + /// At most one permit may be stored by `Notify`. Many sequential calls to + /// `notify_one` will result in a single permit being stored. The next call to + /// `notified().await` will complete immediately, but the one after that + /// will wait. + /// + /// [`notified().await`]: Notify::notified() + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// use std::sync::Arc; + /// + /// #[tokio::main] + /// async fn main() { + /// let notify = Arc::new(Notify::new()); + /// let notify2 = notify.clone(); + /// + /// tokio::spawn(async move { + /// notify2.notified().await; + /// println!("received notification"); + /// }); + /// + /// println!("sending notification"); + /// notify.notify_one(); + /// } + /// ``` + // Alias for old name in 0.x + #[cfg_attr(docsrs, doc(alias = "notify"))] + pub fn notify_one(&self) { + // Load the current state + let mut curr = self.state.load(SeqCst); + + // If the state is `EMPTY`, transition to `NOTIFIED` and return. + while let EMPTY | NOTIFIED = get_state(curr) { + // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A + // happens-before synchronization must happen between this atomic + // operation and a task calling `notified().await`. + let new = set_state(curr, NOTIFIED); + let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst); + + match res { + // No waiters, no further work to do + Ok(_) => return, + Err(actual) => { + curr = actual; + } + } + } + + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + curr = self.state.load(SeqCst); + + if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) { + drop(waiters); + waker.wake(); + } + } + + /// Notifies all waiting tasks. + /// + /// If a task is currently waiting, that task is notified. Unlike with + /// `notify_one()`, no permit is stored to be used by the next call to + /// `notified().await`. The purpose of this method is to notify all + /// already registered waiters. Registering for notification is done by + /// acquiring an instance of the `Notified` future via calling `notified()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// use std::sync::Arc; + /// + /// #[tokio::main] + /// async fn main() { + /// let notify = Arc::new(Notify::new()); + /// let notify2 = notify.clone(); + /// + /// let notified1 = notify.notified(); + /// let notified2 = notify.notified(); + /// + /// let handle = tokio::spawn(async move { + /// println!("sending notifications"); + /// notify2.notify_waiters(); + /// }); + /// + /// notified1.await; + /// notified2.await; + /// println!("received notifications"); + /// } + /// ``` + pub fn notify_waiters(&self) { + let mut wakers = WakeList::new(); + + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + let curr = self.state.load(SeqCst); + + if let EMPTY | NOTIFIED = get_state(curr) { + // There are no waiting tasks. All we need to do is increment the + // number of times this method was called. + atomic_inc_num_notify_waiters_calls(&self.state); + return; + } + + // At this point, it is guaranteed that the state will not + // concurrently change, as holding the lock is required to + // transition **out** of `WAITING`. + 'outer: loop { + while wakers.can_push() { + match waiters.pop_back() { + Some(mut waiter) => { + // Safety: `waiters` lock is still held. + let waiter = unsafe { waiter.as_mut() }; + + assert!(waiter.notified.is_none()); + + waiter.notified = Some(NotificationType::AllWaiters); + + if let Some(waker) = waiter.waker.take() { + wakers.push(waker); + } + } + None => { + break 'outer; + } + } + } + + drop(waiters); + + wakers.wake_all(); + + // Acquire the lock again. + waiters = self.waiters.lock(); + } + + // All waiters will be notified, the state must be transitioned to + // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be + // held, a `store` is sufficient. + let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY); + self.state.store(new, SeqCst); + + // Release the lock before notifying + drop(waiters); + + wakers.wake_all(); + } +} + +impl Default for Notify { + fn default() -> Notify { + Notify::new() + } +} + +fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> { + loop { + match get_state(curr) { + EMPTY | NOTIFIED => { + let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst); + + match res { + Ok(_) => return None, + Err(actual) => { + let actual_state = get_state(actual); + assert!(actual_state == EMPTY || actual_state == NOTIFIED); + state.store(set_state(actual, NOTIFIED), SeqCst); + return None; + } + } + } + WAITING => { + // At this point, it is guaranteed that the state will not + // concurrently change as holding the lock is required to + // transition **out** of `WAITING`. + // + // Get a pending waiter + let mut waiter = waiters.pop_back().unwrap(); + + // Safety: `waiters` lock is still held. + let waiter = unsafe { waiter.as_mut() }; + + assert!(waiter.notified.is_none()); + + waiter.notified = Some(NotificationType::OneWaiter); + let waker = waiter.waker.take(); + + if waiters.is_empty() { + // As this the **final** waiter in the list, the state + // must be transitioned to `EMPTY`. As transitioning + // **from** `WAITING` requires the lock to be held, a + // `store` is sufficient. + state.store(set_state(curr, EMPTY), SeqCst); + } + + return waker; + } + _ => unreachable!(), + } + } +} + +// ===== impl Notified ===== + +impl Notified<'_> { + /// A custom `project` implementation is used in place of `pin-project-lite` + /// as a custom drop implementation is needed. + fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) { + unsafe { + // Safety: both `notify` and `state` are `Unpin`. + + is_unpin::<&Notify>(); + is_unpin::<AtomicUsize>(); + + let me = self.get_unchecked_mut(); + (me.notify, &mut me.state, &me.waiter) + } + } +} + +impl Future for Notified<'_> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + use State::*; + + let (notify, state, waiter) = self.project(); + + loop { + match *state { + Init(initial_notify_waiters_calls) => { + let curr = notify.state.load(SeqCst); + + // Optimistically try acquiring a pending notification + let res = notify.state.compare_exchange( + set_state(curr, NOTIFIED), + set_state(curr, EMPTY), + SeqCst, + SeqCst, + ); + + if res.is_ok() { + // Acquired the notification + *state = Done; + return Poll::Ready(()); + } + + // Clone the waker before locking, a waker clone can be + // triggering arbitrary code. + let waker = cx.waker().clone(); + + // Acquire the lock and attempt to transition to the waiting + // state. + let mut waiters = notify.waiters.lock(); + + // Reload the state with the lock held + let mut curr = notify.state.load(SeqCst); + + // if notify_waiters has been called after the future + // was created, then we are done + if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls { + *state = Done; + return Poll::Ready(()); + } + + // Transition the state to WAITING. + loop { + match get_state(curr) { + EMPTY => { + // Transition to WAITING + let res = notify.state.compare_exchange( + set_state(curr, EMPTY), + set_state(curr, WAITING), + SeqCst, + SeqCst, + ); + + if let Err(actual) = res { + assert_eq!(get_state(actual), NOTIFIED); + curr = actual; + } else { + break; + } + } + WAITING => break, + NOTIFIED => { + // Try consuming the notification + let res = notify.state.compare_exchange( + set_state(curr, NOTIFIED), + set_state(curr, EMPTY), + SeqCst, + SeqCst, + ); + + match res { + Ok(_) => { + // Acquired the notification + *state = Done; + return Poll::Ready(()); + } + Err(actual) => { + assert_eq!(get_state(actual), EMPTY); + curr = actual; + } + } + } + _ => unreachable!(), + } + } + + // Safety: called while locked. + unsafe { + (*waiter.get()).waker = Some(waker); + } + + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + + *state = Waiting; + + return Poll::Pending; + } + Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = notify.waiters.lock(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.notified.is_some() { + // Our waker has been notified. Reset the fields and + // remove it from the list. + w.waker = None; + w.notified = None; + + *state = Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + Done => { + return Poll::Ready(()); + } + } + } + } +} + +impl Drop for Notified<'_> { + fn drop(&mut self) { + use State::*; + + // Safety: The type only transitions to a "Waiting" state when pinned. + let (notify, state, waiter) = unsafe { Pin::new_unchecked(self).project() }; + + // This is where we ensure safety. The `Notified` value is being + // dropped, which means we must ensure that the waiter entry is no + // longer stored in the linked list. + if let Waiting = *state { + let mut waiters = notify.waiters.lock(); + let mut notify_state = notify.state.load(SeqCst); + + // remove the entry from the list (if not already removed) + // + // safety: the waiter is only added to `waiters` by virtue of it + // being the only `LinkedList` available to the type. + unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) }; + + if waiters.is_empty() { + if let WAITING = get_state(notify_state) { + notify_state = set_state(notify_state, EMPTY); + notify.state.store(notify_state, SeqCst); + } + } + + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. + // + // Safety: with the entry removed from the linked list, there can be + // no concurrent access to the entry + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { + if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { + drop(waiters); + waker.wake(); + } + } + } + } +} + +/// # Safety +/// +/// `Waiter` is forced to be !Unpin. +unsafe impl linked_list::Link for Waiter { + type Handle = NonNull<Waiter>; + type Target = Waiter; + + fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { + *handle + } + + unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { + ptr + } + + unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { + target.cast() + } +} + +fn is_unpin<T: Unpin>() {} |