use std::cell::UnsafeCell; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; /// A "lock" around data `D`, which employs a *helping* strategy. /// /// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being /// invoked on only a single thread at a time (2) `poll` being invoked at least /// once after each `unpark` (unless the future has completed). pub(crate) struct UnparkMutex { // The state of task execution (state machine described below) status: AtomicUsize, // The actual task data, accessible only in the POLLING state inner: UnsafeCell>, } // `UnparkMutex` functions in many ways like a `Mutex`, except that on // acquisition failure, the current lock holder performs the desired work -- // re-polling. // // As such, these impls mirror those for `Mutex`. In particular, a reference // to `UnparkMutex` can be used to gain `&mut` access to the inner data, which // must therefore be `Send`. unsafe impl Send for UnparkMutex {} unsafe impl Sync for UnparkMutex {} // There are four possible task states, listed below with their possible // transitions: // The task is blocked, waiting on an event const WAITING: usize = 0; // --> POLLING // The task is actively being polled by a thread; arrival of additional events // of interest should move it to the REPOLL state const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE // The task is actively being polled, but will need to be re-polled upon // completion to ensure that all events were observed. const REPOLL: usize = 2; // --> POLLING // The task has finished executing (either successfully or with an error/panic) const COMPLETE: usize = 3; // No transitions out impl UnparkMutex { pub(crate) fn new() -> Self { Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) } } /// Attempt to "notify" the mutex that a poll should occur. /// /// An `Ok` result indicates that the `POLLING` state has been entered, and /// the caller can proceed to poll the future. An `Err` result indicates /// that polling is not necessary (because the task is finished or the /// polling has been delegated). pub(crate) fn notify(&self) -> Result { let mut status = self.status.load(SeqCst); loop { match status { // The task is idle, so try to run it immediately. WAITING => { match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) { Ok(_) => { let data = unsafe { // SAFETY: we've ensured mutual exclusion via // the status protocol; we are the only thread // that has transitioned to the POLLING state, // and we won't transition back to QUEUED until // the lock is "released" by this thread. See // the protocol diagram above. (*self.inner.get()).take().unwrap() }; return Ok(data); } Err(cur) => status = cur, } } // The task is being polled, so we need to record that it should // be *repolled* when complete. POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) { Ok(_) => return Err(()), Err(cur) => status = cur, }, // The task is already scheduled for polling, or is complete, so // we've got nothing to do. _ => return Err(()), } } } /// Alert the mutex that polling is about to begin, clearing any accumulated /// re-poll requests. /// /// # Safety /// /// Callable only from the `POLLING`/`REPOLL` states, i.e. between /// successful calls to `notify` and `wait`/`complete`. pub(crate) unsafe fn start_poll(&self) { self.status.store(POLLING, SeqCst); } /// Alert the mutex that polling completed with `Pending`. /// /// # Safety /// /// Callable only from the `POLLING`/`REPOLL` states, i.e. between /// successful calls to `notify` and `wait`/`complete`. pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> { *self.inner.get() = Some(data); match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) { // no unparks came in while we were running Ok(_) => Ok(()), // guaranteed to be in REPOLL state; just clobber the // state and run again. Err(status) => { assert_eq!(status, REPOLL); self.status.store(POLLING, SeqCst); Err((*self.inner.get()).take().unwrap()) } } } /// Alert the mutex that the task has completed execution and should not be /// notified again. /// /// # Safety /// /// Callable only from the `POLLING`/`REPOLL` states, i.e. between /// successful calls to `notify` and `wait`/`complete`. pub(crate) unsafe fn complete(&self) { self.status.store(COMPLETE, SeqCst); } }