summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-utils/src/sync/parker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/crossbeam-utils/src/sync/parker.rs')
-rw-r--r--third_party/rust/crossbeam-utils/src/sync/parker.rs413
1 files changed, 413 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/src/sync/parker.rs b/third_party/rust/crossbeam-utils/src/sync/parker.rs
new file mode 100644
index 0000000000..e791c44852
--- /dev/null
+++ b/third_party/rust/crossbeam-utils/src/sync/parker.rs
@@ -0,0 +1,413 @@
+use crate::primitive::sync::atomic::AtomicUsize;
+use crate::primitive::sync::{Arc, Condvar, Mutex};
+use core::sync::atomic::Ordering::SeqCst;
+use std::fmt;
+use std::marker::PhantomData;
+use std::time::{Duration, Instant};
+
+/// A thread parking primitive.
+///
+/// Conceptually, each `Parker` has an associated token which is initially not present:
+///
+/// * The [`park`] method blocks the current thread unless or until the token is available, at
+/// which point it automatically consumes the token.
+///
+/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
+/// a specified maximum time.
+///
+/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
+/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
+/// returning immediately.
+///
+/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
+/// [`park`] and [`unpark`].
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_utils::sync::Parker;
+///
+/// let p = Parker::new();
+/// let u = p.unparker().clone();
+///
+/// // Make the token available.
+/// u.unpark();
+/// // Wakes up immediately and consumes the token.
+/// p.park();
+///
+/// thread::spawn(move || {
+/// thread::sleep(Duration::from_millis(500));
+/// u.unpark();
+/// });
+///
+/// // Wakes up when `u.unpark()` provides the token.
+/// p.park();
+/// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+/// ```
+///
+/// [`park`]: Parker::park
+/// [`park_timeout`]: Parker::park_timeout
+/// [`park_deadline`]: Parker::park_deadline
+/// [`unpark`]: Unparker::unpark
+pub struct Parker {
+ unparker: Unparker,
+ _marker: PhantomData<*const ()>,
+}
+
+unsafe impl Send for Parker {}
+
+impl Default for Parker {
+ fn default() -> Self {
+ Self {
+ unparker: Unparker {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ lock: Mutex::new(()),
+ cvar: Condvar::new(),
+ }),
+ },
+ _marker: PhantomData,
+ }
+ }
+}
+
+impl Parker {
+ /// Creates a new `Parker`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// ```
+ ///
+ pub fn new() -> Parker {
+ Self::default()
+ }
+
+ /// Blocks the current thread until the token is made available.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// // Make the token available.
+ /// u.unpark();
+ ///
+ /// // Wakes up immediately and consumes the token.
+ /// p.park();
+ /// ```
+ pub fn park(&self) {
+ self.unparker.inner.park(None);
+ }
+
+ /// Blocks the current thread until the token is made available, but only for a limited time.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::time::Duration;
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ ///
+ /// // Waits for the token to become available, but will not wait longer than 500 ms.
+ /// p.park_timeout(Duration::from_millis(500));
+ /// ```
+ pub fn park_timeout(&self, timeout: Duration) {
+ self.park_deadline(Instant::now() + timeout)
+ }
+
+ /// Blocks the current thread until the token is made available, or until a certain deadline.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::time::{Duration, Instant};
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let deadline = Instant::now() + Duration::from_millis(500);
+ ///
+ /// // Waits for the token to become available, but will not wait longer than 500 ms.
+ /// p.park_deadline(deadline);
+ /// ```
+ pub fn park_deadline(&self, deadline: Instant) {
+ self.unparker.inner.park(Some(deadline))
+ }
+
+ /// Returns a reference to an associated [`Unparker`].
+ ///
+ /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// // Make the token available.
+ /// u.unpark();
+ /// // Wakes up immediately and consumes the token.
+ /// p.park();
+ /// ```
+ ///
+ /// [`park`]: Parker::park
+ /// [`park_timeout`]: Parker::park_timeout
+ pub fn unparker(&self) -> &Unparker {
+ &self.unparker
+ }
+
+ /// Converts a `Parker` into a raw pointer.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let raw = Parker::into_raw(p);
+ /// # let _ = unsafe { Parker::from_raw(raw) };
+ /// ```
+ pub fn into_raw(this: Parker) -> *const () {
+ Unparker::into_raw(this.unparker)
+ }
+
+ /// Converts a raw pointer into a `Parker`.
+ ///
+ /// # Safety
+ ///
+ /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let raw = Parker::into_raw(p);
+ /// let p = unsafe { Parker::from_raw(raw) };
+ /// ```
+ pub unsafe fn from_raw(ptr: *const ()) -> Parker {
+ Parker {
+ unparker: Unparker::from_raw(ptr),
+ _marker: PhantomData,
+ }
+ }
+}
+
+impl fmt::Debug for Parker {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Parker { .. }")
+ }
+}
+
+/// Unparks a thread parked by the associated [`Parker`].
+pub struct Unparker {
+ inner: Arc<Inner>,
+}
+
+unsafe impl Send for Unparker {}
+unsafe impl Sync for Unparker {}
+
+impl Unparker {
+ /// Atomically makes the token available if it is not already.
+ ///
+ /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
+ /// any.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(500));
+ /// u.unpark();
+ /// });
+ ///
+ /// // Wakes up when `u.unpark()` provides the token.
+ /// p.park();
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+ /// ```
+ ///
+ /// [`park`]: Parker::park
+ /// [`park_timeout`]: Parker::park_timeout
+ pub fn unpark(&self) {
+ self.inner.unpark()
+ }
+
+ /// Converts an `Unparker` into a raw pointer.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::{Parker, Unparker};
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ /// let raw = Unparker::into_raw(u);
+ /// # let _ = unsafe { Unparker::from_raw(raw) };
+ /// ```
+ pub fn into_raw(this: Unparker) -> *const () {
+ Arc::into_raw(this.inner).cast::<()>()
+ }
+
+ /// Converts a raw pointer into an `Unparker`.
+ ///
+ /// # Safety
+ ///
+ /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::{Parker, Unparker};
+ ///
+ /// let p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// let raw = Unparker::into_raw(u);
+ /// let u = unsafe { Unparker::from_raw(raw) };
+ /// ```
+ pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
+ Unparker {
+ inner: Arc::from_raw(ptr.cast::<Inner>()),
+ }
+ }
+}
+
+impl fmt::Debug for Unparker {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Unparker { .. }")
+ }
+}
+
+impl Clone for Unparker {
+ fn clone(&self) -> Unparker {
+ Unparker {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+struct Inner {
+ state: AtomicUsize,
+ lock: Mutex<()>,
+ cvar: Condvar,
+}
+
+impl Inner {
+ fn park(&self, deadline: Option<Instant>) {
+ // If we were previously notified then we consume this notification and return quickly.
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ return;
+ }
+
+ // If the timeout is zero, then there is no need to actually block.
+ if let Some(deadline) = deadline {
+ if deadline <= Instant::now() {
+ return;
+ }
+ }
+
+ // Otherwise we need to coordinate going to sleep.
+ let mut m = self.lock.lock().unwrap();
+
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ // Consume this notification to avoid spurious wakeups in the next park.
+ Err(NOTIFIED) => {
+ // We must read `state` here, even though we know it will be `NOTIFIED`. This is
+ // because `unpark` may have been called again since we read `NOTIFIED` in the
+ // `compare_exchange` above. We must perform an acquire operation that synchronizes
+ // with that `unpark` to observe any writes it made before the call to `unpark`. To
+ // do that we must read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ }
+ Err(n) => panic!("inconsistent park_timeout state: {}", n),
+ }
+
+ loop {
+ // Block the current thread on the conditional variable.
+ m = match deadline {
+ None => self.cvar.wait(m).unwrap(),
+ Some(deadline) => {
+ let now = Instant::now();
+ if now < deadline {
+ // We could check for a timeout here, in the return value of wait_timeout,
+ // but in the case that a timeout and an unpark arrive simultaneously, we
+ // prefer to report the former.
+ self.cvar.wait_timeout(m, deadline - now).unwrap().0
+ } else {
+ // We've timed out; swap out the state back to empty on our way out
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED | PARKED => return,
+ n => panic!("inconsistent park_timeout state: {}", n),
+ };
+ }
+ }
+ };
+
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ // got a notification
+ return;
+ }
+
+ // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
+ // in the branch above, when we discover the deadline is in the past
+ }
+ }
+
+ pub(crate) fn unpark(&self) {
+ // To ensure the unparked thread will observe any writes we made before this call, we must
+ // perform a release operation that `park` can synchronize with. To do that we must write
+ // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
+ // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
+ match self.state.swap(NOTIFIED, SeqCst) {
+ EMPTY => return, // no one was waiting
+ NOTIFIED => return, // already unparked
+ PARKED => {} // gotta go wake someone up
+ _ => panic!("inconsistent state in unpark"),
+ }
+
+ // There is a period between when the parked thread sets `state` to `PARKED` (or last
+ // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
+ // If we were to notify during this period it would be ignored and then when the parked
+ // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
+ // stage so we can acquire `lock` to wait until it is ready to receive the notification.
+ //
+ // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
+ // it doesn't get woken only to have to wait for us to release `lock`.
+ drop(self.lock.lock().unwrap());
+ self.cvar.notify_one();
+ }
+}