diff options
Diffstat (limited to 'third_party/rust/crossbeam-utils/src/sync/parker.rs')
-rw-r--r-- | third_party/rust/crossbeam-utils/src/sync/parker.rs | 413 |
1 files changed, 413 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/src/sync/parker.rs b/third_party/rust/crossbeam-utils/src/sync/parker.rs new file mode 100644 index 0000000000..e791c44852 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/sync/parker.rs @@ -0,0 +1,413 @@ +use crate::primitive::sync::atomic::AtomicUsize; +use crate::primitive::sync::{Arc, Condvar, Mutex}; +use core::sync::atomic::Ordering::SeqCst; +use std::fmt; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; + +/// A thread parking primitive. +/// +/// Conceptually, each `Parker` has an associated token which is initially not present: +/// +/// * The [`park`] method blocks the current thread unless or until the token is available, at +/// which point it automatically consumes the token. +/// +/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for +/// a specified maximum time. +/// +/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the +/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call +/// returning immediately. +/// +/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using +/// [`park`] and [`unpark`]. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::time::Duration; +/// use crossbeam_utils::sync::Parker; +/// +/// let p = Parker::new(); +/// let u = p.unparker().clone(); +/// +/// // Make the token available. +/// u.unpark(); +/// // Wakes up immediately and consumes the token. +/// p.park(); +/// +/// thread::spawn(move || { +/// thread::sleep(Duration::from_millis(500)); +/// u.unpark(); +/// }); +/// +/// // Wakes up when `u.unpark()` provides the token. +/// p.park(); +/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +/// ``` +/// +/// [`park`]: Parker::park +/// [`park_timeout`]: Parker::park_timeout +/// [`park_deadline`]: Parker::park_deadline +/// [`unpark`]: Unparker::unpark +pub struct Parker { + unparker: Unparker, + _marker: PhantomData<*const ()>, +} + +unsafe impl Send for Parker {} + +impl Default for Parker { + fn default() -> Self { + Self { + unparker: Unparker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }), + }, + _marker: PhantomData, + } + } +} + +impl Parker { + /// Creates a new `Parker`. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// ``` + /// + pub fn new() -> Parker { + Self::default() + } + + /// Blocks the current thread until the token is made available. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + pub fn park(&self) { + self.unparker.inner.park(None); + } + + /// Blocks the current thread until the token is made available, but only for a limited time. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_timeout(Duration::from_millis(500)); + /// ``` + pub fn park_timeout(&self, timeout: Duration) { + self.park_deadline(Instant::now() + timeout) + } + + /// Blocks the current thread until the token is made available, or until a certain deadline. + /// + /// # Examples + /// + /// ``` + /// use std::time::{Duration, Instant}; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_deadline(deadline); + /// ``` + pub fn park_deadline(&self, deadline: Instant) { + self.unparker.inner.park(Some(deadline)) + } + + /// Returns a reference to an associated [`Unparker`]. + /// + /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// // Make the token available. + /// u.unpark(); + /// // Wakes up immediately and consumes the token. + /// p.park(); + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unparker(&self) -> &Unparker { + &self.unparker + } + + /// Converts a `Parker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// # let _ = unsafe { Parker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Parker) -> *const () { + Unparker::into_raw(this.unparker) + } + + /// Converts a raw pointer into a `Parker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let raw = Parker::into_raw(p); + /// let p = unsafe { Parker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Parker { + Parker { + unparker: Unparker::from_raw(ptr), + _marker: PhantomData, + } + } +} + +impl fmt::Debug for Parker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Parker { .. }") + } +} + +/// Unparks a thread parked by the associated [`Parker`]. +pub struct Unparker { + inner: Arc<Inner>, +} + +unsafe impl Send for Unparker {} +unsafe impl Sync for Unparker {} + +impl Unparker { + /// Atomically makes the token available if it is not already. + /// + /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is + /// any. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(500)); + /// u.unpark(); + /// }); + /// + /// // Wakes up when `u.unpark()` provides the token. + /// p.park(); + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// [`park`]: Parker::park + /// [`park_timeout`]: Parker::park_timeout + pub fn unpark(&self) { + self.inner.unpark() + } + + /// Converts an `Unparker` into a raw pointer. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// let raw = Unparker::into_raw(u); + /// # let _ = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub fn into_raw(this: Unparker) -> *const () { + Arc::into_raw(this.inner).cast::<()>() + } + + /// Converts a raw pointer into an `Unparker`. + /// + /// # Safety + /// + /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::sync::{Parker, Unparker}; + /// + /// let p = Parker::new(); + /// let u = p.unparker().clone(); + /// + /// let raw = Unparker::into_raw(u); + /// let u = unsafe { Unparker::from_raw(raw) }; + /// ``` + pub unsafe fn from_raw(ptr: *const ()) -> Unparker { + Unparker { + inner: Arc::from_raw(ptr.cast::<Inner>()), + } + } +} + +impl fmt::Debug for Unparker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Unparker { .. }") + } +} + +impl Clone for Unparker { + fn clone(&self) -> Unparker { + Unparker { + inner: self.inner.clone(), + } + } +} + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +struct Inner { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Inner { + fn park(&self, deadline: Option<Instant>) { + // If we were previously notified then we consume this notification and return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + // If the timeout is zero, then there is no need to actually block. + if let Some(deadline) = deadline { + if deadline <= Instant::now() { + return; + } + } + + // Otherwise we need to coordinate going to sleep. + let mut m = self.lock.lock().unwrap(); + + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + // Consume this notification to avoid spurious wakeups in the next park. + Err(NOTIFIED) => { + // We must read `state` here, even though we know it will be `NOTIFIED`. This is + // because `unpark` may have been called again since we read `NOTIFIED` in the + // `compare_exchange` above. We must perform an acquire operation that synchronizes + // with that `unpark` to observe any writes it made before the call to `unpark`. To + // do that we must read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } + Err(n) => panic!("inconsistent park_timeout state: {}", n), + } + + loop { + // Block the current thread on the conditional variable. + m = match deadline { + None => self.cvar.wait(m).unwrap(), + Some(deadline) => { + let now = Instant::now(); + if now < deadline { + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. + self.cvar.wait_timeout(m, deadline - now).unwrap().0 + } else { + // We've timed out; swap out the state back to empty on our way out + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED | PARKED => return, + n => panic!("inconsistent park_timeout state: {}", n), + }; + } + } + }; + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; + } + + // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught + // in the branch above, when we discover the deadline is in the past + } + } + + pub(crate) fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before this call, we must + // perform a release operation that `park` can synchronize with. To do that we must write + // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather + // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to `PARKED` (or last + // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. + // If we were to notify during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this + // stage so we can acquire `lock` to wait until it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes + // it doesn't get woken only to have to wait for us to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one(); + } +} |