summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-utils-0.6.6/src/sync/parker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/crossbeam-utils-0.6.6/src/sync/parker.rs')
-rw-r--r--third_party/rust/crossbeam-utils-0.6.6/src/sync/parker.rs311
1 files changed, 311 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils-0.6.6/src/sync/parker.rs b/third_party/rust/crossbeam-utils-0.6.6/src/sync/parker.rs
new file mode 100644
index 0000000000..506db8e393
--- /dev/null
+++ b/third_party/rust/crossbeam-utils-0.6.6/src/sync/parker.rs
@@ -0,0 +1,311 @@
+use std::fmt;
+use std::marker::PhantomData;
+use std::sync::{Arc, Condvar, Mutex};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::time::Duration;
+
+/// A thread parking primitive.
+///
+/// Conceptually, each `Parker` has an associated token which is initially not present:
+///
+/// * The [`park`] method blocks the current thread unless or until the token is available, at
+/// which point it automatically consumes the token. It may also return *spuriously*, without
+/// consuming the token.
+///
+/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
+/// time.
+///
+/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
+/// token is initially absent, [`unpark`] followed by [`park`] will result in the second call
+/// returning immediately.
+///
+/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
+/// [`park`] and [`unpark`].
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use std::time::Duration;
+/// use crossbeam_utils::sync::Parker;
+///
+/// let mut p = Parker::new();
+/// let u = p.unparker().clone();
+///
+/// // Make the token available.
+/// u.unpark();
+/// // Wakes up immediately and consumes the token.
+/// p.park();
+///
+/// thread::spawn(move || {
+/// thread::sleep(Duration::from_millis(500));
+/// u.unpark();
+/// });
+///
+/// // Wakes up when `u.unpark()` provides the token, but may also wake up
+/// // spuriously before that without consuming the token.
+/// p.park();
+/// ```
+///
+/// [`park`]: struct.Parker.html#method.park
+/// [`park_timeout`]: struct.Parker.html#method.park_timeout
+/// [`unpark`]: struct.Unparker.html#method.unpark
+pub struct Parker {
+ unparker: Unparker,
+ _marker: PhantomData<*const ()>,
+}
+
+unsafe impl Send for Parker {}
+
+impl Parker {
+ /// Creates a new `Parker`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let p = Parker::new();
+ /// ```
+ ///
+ pub fn new() -> Parker {
+ Parker {
+ unparker: Unparker {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ lock: Mutex::new(()),
+ cvar: Condvar::new(),
+ }),
+ },
+ _marker: PhantomData,
+ }
+ }
+
+ /// Blocks the current thread until the token is made available.
+ ///
+ /// A call to `park` may wake up spuriously without consuming the token, and callers should be
+ /// prepared for this possibility.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let mut p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// // Make the token available.
+ /// u.unpark();
+ ///
+ /// // Wakes up immediately and consumes the token.
+ /// p.park();
+ /// ```
+ pub fn park(&self) {
+ self.unparker.inner.park(None);
+ }
+
+ /// Blocks the current thread until the token is made available, but only for a limited time.
+ ///
+ /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
+ /// should be prepared for this possibility.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::time::Duration;
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let mut p = Parker::new();
+ ///
+ /// // Waits for the token to become available, but will not wait longer than 500 ms.
+ /// p.park_timeout(Duration::from_millis(500));
+ /// ```
+ pub fn park_timeout(&self, timeout: Duration) {
+ self.unparker.inner.park(Some(timeout));
+ }
+
+ /// Returns a reference to an associated [`Unparker`].
+ ///
+ /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let mut p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// // Make the token available.
+ /// u.unpark();
+ /// // Wakes up immediately and consumes the token.
+ /// p.park();
+ /// ```
+ ///
+ /// [`park`]: struct.Parker.html#method.park
+ /// [`park_timeout`]: struct.Parker.html#method.park_timeout
+ ///
+ /// [`Unparker`]: struct.Unparker.html
+ pub fn unparker(&self) -> &Unparker {
+ &self.unparker
+ }
+}
+
+impl fmt::Debug for Parker {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad("Parker { .. }")
+ }
+}
+
+/// Unparks a thread parked by the associated [`Parker`].
+///
+/// [`Parker`]: struct.Parker.html
+pub struct Unparker {
+ inner: Arc<Inner>,
+}
+
+unsafe impl Send for Unparker {}
+unsafe impl Sync for Unparker {}
+
+impl Unparker {
+ /// Atomically makes the token available if it is not already.
+ ///
+ /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
+ /// any.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use crossbeam_utils::sync::Parker;
+ ///
+ /// let mut p = Parker::new();
+ /// let u = p.unparker().clone();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(500));
+ /// u.unpark();
+ /// });
+ ///
+ /// // Wakes up when `u.unpark()` provides the token, but may also wake up
+ /// // spuriously before that without consuming the token.
+ /// p.park();
+ /// ```
+ ///
+ /// [`park`]: struct.Parker.html#method.park
+ /// [`park_timeout`]: struct.Parker.html#method.park_timeout
+ pub fn unpark(&self) {
+ self.inner.unpark()
+ }
+}
+
+impl fmt::Debug for Unparker {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad("Unparker { .. }")
+ }
+}
+
+impl Clone for Unparker {
+ fn clone(&self) -> Unparker {
+ Unparker {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+struct Inner {
+ state: AtomicUsize,
+ lock: Mutex<()>,
+ cvar: Condvar,
+}
+
+impl Inner {
+ fn park(&self, timeout: Option<Duration>) {
+ // If we were previously notified then we consume this notification and return quickly.
+ if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+ return;
+ }
+
+ // If the timeout is zero, then there is no need to actually block.
+ if let Some(ref dur) = timeout {
+ if *dur == Duration::from_millis(0) {
+ return;
+ }
+ }
+
+ // Otherwise we need to coordinate going to sleep.
+ let mut m = self.lock.lock().unwrap();
+
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ // Consume this notification to avoid spurious wakeups in the next park.
+ Err(NOTIFIED) => {
+ // We must read `state` here, even though we know it will be `NOTIFIED`. This is
+ // because `unpark` may have been called again since we read `NOTIFIED` in the
+ // `compare_exchange` above. We must perform an acquire operation that synchronizes
+ // with that `unpark` to observe any writes it made before the call to `unpark`. To
+ // do that we must read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ }
+ Err(n) => panic!("inconsistent park_timeout state: {}", n),
+ }
+
+ match timeout {
+ None => {
+ loop {
+ // Block the current thread on the conditional variable.
+ m = self.cvar.wait(m).unwrap();
+
+ match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+ Ok(_) => return, // got a notification
+ Err(_) => {} // spurious wakeup, go back to sleep
+ }
+ }
+ }
+ Some(timeout) => {
+ // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
+ // notification we just want to unconditionally set `state` back to `EMPTY`, either
+ // consuming a notification or un-flagging ourselves as parked.
+ let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
+
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED => {} // got a notification
+ PARKED => {} // no notification
+ n => panic!("inconsistent park_timeout state: {}", n),
+ }
+ }
+ }
+ }
+
+ pub fn unpark(&self) {
+ // To ensure the unparked thread will observe any writes we made before this call, we must
+ // perform a release operation that `park` can synchronize with. To do that we must write
+ // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
+ // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
+ match self.state.swap(NOTIFIED, SeqCst) {
+ EMPTY => return, // no one was waiting
+ NOTIFIED => return, // already unparked
+ PARKED => {} // gotta go wake someone up
+ _ => panic!("inconsistent state in unpark"),
+ }
+
+ // There is a period between when the parked thread sets `state` to `PARKED` (or last
+ // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
+ // If we were to notify during this period it would be ignored and then when the parked
+ // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
+ // stage so we can acquire `lock` to wait until it is ready to receive the notification.
+ //
+ // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
+ // it doesn't get woken only to have to wait for us to release `lock`.
+ drop(self.lock.lock().unwrap());
+ self.cvar.notify_one();
+ }
+}