From 698f8c2f01ea549d77d7dc3338a12e04c11057b9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 14:02:58 +0200 Subject: Adding upstream version 1.64.0+dfsg1. Signed-off-by: Daniel Baumann --- library/std/src/sync/barrier.rs | 174 +++ library/std/src/sync/barrier/tests.rs | 35 + library/std/src/sync/condvar.rs | 564 +++++++++ library/std/src/sync/condvar/tests.rs | 190 +++ library/std/src/sync/lazy_lock.rs | 121 ++ library/std/src/sync/lazy_lock/tests.rs | 143 +++ library/std/src/sync/mod.rs | 189 +++ library/std/src/sync/mpsc/blocking.rs | 82 ++ library/std/src/sync/mpsc/cache_aligned.rs | 25 + library/std/src/sync/mpsc/mod.rs | 1669 +++++++++++++++++++++++++ library/std/src/sync/mpsc/mpsc_queue.rs | 117 ++ library/std/src/sync/mpsc/mpsc_queue/tests.rs | 47 + library/std/src/sync/mpsc/oneshot.rs | 315 +++++ library/std/src/sync/mpsc/shared.rs | 501 ++++++++ library/std/src/sync/mpsc/spsc_queue.rs | 236 ++++ library/std/src/sync/mpsc/spsc_queue/tests.rs | 101 ++ library/std/src/sync/mpsc/stream.rs | 457 +++++++ library/std/src/sync/mpsc/sync.rs | 495 ++++++++ library/std/src/sync/mpsc/sync_tests.rs | 647 ++++++++++ library/std/src/sync/mpsc/tests.rs | 706 +++++++++++ library/std/src/sync/mutex.rs | 553 ++++++++ library/std/src/sync/mutex/tests.rs | 238 ++++ library/std/src/sync/once.rs | 580 +++++++++ library/std/src/sync/once/tests.rs | 116 ++ library/std/src/sync/once_lock.rs | 496 ++++++++ library/std/src/sync/once_lock/tests.rs | 203 +++ library/std/src/sync/poison.rs | 272 ++++ library/std/src/sync/rwlock.rs | 615 +++++++++ library/std/src/sync/rwlock/tests.rs | 259 ++++ 29 files changed, 10146 insertions(+) create mode 100644 library/std/src/sync/barrier.rs create mode 100644 library/std/src/sync/barrier/tests.rs create mode 100644 library/std/src/sync/condvar.rs create mode 100644 library/std/src/sync/condvar/tests.rs create mode 100644 library/std/src/sync/lazy_lock.rs create mode 100644 library/std/src/sync/lazy_lock/tests.rs create mode 100644 library/std/src/sync/mod.rs create mode 100644 library/std/src/sync/mpsc/blocking.rs create mode 100644 library/std/src/sync/mpsc/cache_aligned.rs create mode 100644 library/std/src/sync/mpsc/mod.rs create mode 100644 library/std/src/sync/mpsc/mpsc_queue.rs create mode 100644 library/std/src/sync/mpsc/mpsc_queue/tests.rs create mode 100644 library/std/src/sync/mpsc/oneshot.rs create mode 100644 library/std/src/sync/mpsc/shared.rs create mode 100644 library/std/src/sync/mpsc/spsc_queue.rs create mode 100644 library/std/src/sync/mpsc/spsc_queue/tests.rs create mode 100644 library/std/src/sync/mpsc/stream.rs create mode 100644 library/std/src/sync/mpsc/sync.rs create mode 100644 library/std/src/sync/mpsc/sync_tests.rs create mode 100644 library/std/src/sync/mpsc/tests.rs create mode 100644 library/std/src/sync/mutex.rs create mode 100644 library/std/src/sync/mutex/tests.rs create mode 100644 library/std/src/sync/once.rs create mode 100644 library/std/src/sync/once/tests.rs create mode 100644 library/std/src/sync/once_lock.rs create mode 100644 library/std/src/sync/once_lock/tests.rs create mode 100644 library/std/src/sync/poison.rs create mode 100644 library/std/src/sync/rwlock.rs create mode 100644 library/std/src/sync/rwlock/tests.rs (limited to 'library/std/src/sync') diff --git a/library/std/src/sync/barrier.rs b/library/std/src/sync/barrier.rs new file mode 100644 index 000000000..11836b7b6 --- /dev/null +++ b/library/std/src/sync/barrier.rs @@ -0,0 +1,174 @@ +#[cfg(test)] +mod tests; + +use crate::fmt; +use crate::sync::{Condvar, Mutex}; + +/// A barrier enables multiple threads to synchronize the beginning +/// of some computation. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Barrier}; +/// use std::thread; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in 0..10 { +/// let c = Arc::clone(&barrier); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(thread::spawn(move|| { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// })); +/// } +/// // Wait for other threads to finish. +/// for handle in handles { +/// handle.join().unwrap(); +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Barrier { + lock: Mutex, + cvar: Condvar, + num_threads: usize, +} + +// The inner state of a double barrier +struct BarrierState { + count: usize, + generation_id: usize, +} + +/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads +/// in the [`Barrier`] have rendezvoused. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Barrier; +/// +/// let barrier = Barrier::new(1); +/// let barrier_wait_result = barrier.wait(); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct BarrierWaitResult(bool); + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for Barrier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Barrier").finish_non_exhaustive() + } +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`wait()`] and then wake + /// up all threads at once when the `n`th thread calls [`wait()`]. + /// + /// [`wait()`]: Barrier::wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(10); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[must_use] + pub fn new(n: usize) -> Barrier { + Barrier { + lock: Mutex::new(BarrierState { count: 0, generation_id: 0 }), + cvar: Condvar::new(), + num_threads: n, + } + } + + /// Blocks the current thread until all threads have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that + /// returns `true` from [`BarrierWaitResult::is_leader()`] when returning + /// from this function, and all other threads will receive a result that + /// will return `false` from [`BarrierWaitResult::is_leader()`]. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Barrier}; + /// use std::thread; + /// + /// let mut handles = Vec::with_capacity(10); + /// let barrier = Arc::new(Barrier::new(10)); + /// for _ in 0..10 { + /// let c = Arc::clone(&barrier); + /// // The same messages will be printed together. + /// // You will NOT see any interleaving. + /// handles.push(thread::spawn(move|| { + /// println!("before wait"); + /// c.wait(); + /// println!("after wait"); + /// })); + /// } + /// // Wait for other threads to finish. + /// for handle in handles { + /// handle.join().unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn wait(&self) -> BarrierWaitResult { + let mut lock = self.lock.lock().unwrap(); + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // https://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id { + lock = self.cvar.wait(lock).unwrap(); + } + BarrierWaitResult(false) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + BarrierWaitResult(true) + } + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for BarrierWaitResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BarrierWaitResult").field("is_leader", &self.is_leader()).finish() + } +} + +impl BarrierWaitResult { + /// Returns `true` if this thread is the "leader thread" for the call to + /// [`Barrier::wait()`]. + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(1); + /// let barrier_wait_result = barrier.wait(); + /// println!("{:?}", barrier_wait_result.is_leader()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[must_use] + pub fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/library/std/src/sync/barrier/tests.rs b/library/std/src/sync/barrier/tests.rs new file mode 100644 index 000000000..834a3e751 --- /dev/null +++ b/library/std/src/sync/barrier/tests.rs @@ -0,0 +1,35 @@ +use crate::sync::mpsc::{channel, TryRecvError}; +use crate::sync::{Arc, Barrier}; +use crate::thread; + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn test_barrier() { + const N: usize = 10; + + let barrier = Arc::new(Barrier::new(N)); + let (tx, rx) = channel(); + + for _ in 0..N - 1 { + let c = barrier.clone(); + let tx = tx.clone(); + thread::spawn(move || { + tx.send(c.wait().is_leader()).unwrap(); + }); + } + + // At this point, all spawned threads should be blocked, + // so we shouldn't get anything from the port + assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); + + let mut leader_found = barrier.wait().is_leader(); + + // Now, the barrier is cleared and we should get data. + for _ in 0..N - 1 { + if rx.recv().unwrap() { + assert!(!leader_found); + leader_found = true; + } + } + assert!(leader_found); +} diff --git a/library/std/src/sync/condvar.rs b/library/std/src/sync/condvar.rs new file mode 100644 index 000000000..eb1e7135a --- /dev/null +++ b/library/std/src/sync/condvar.rs @@ -0,0 +1,564 @@ +#[cfg(test)] +mod tests; + +use crate::fmt; +use crate::sync::{mutex, poison, LockResult, MutexGuard, PoisonError}; +use crate::sys_common::condvar as sys; +use crate::time::{Duration, Instant}; + +/// A type indicating whether a timed wait on a condition variable returned +/// due to a time out or not. +/// +/// It is returned by the [`wait_timeout`] method. +/// +/// [`wait_timeout`]: Condvar::wait_timeout +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +#[stable(feature = "wait_timeout", since = "1.5.0")] +pub struct WaitTimeoutResult(bool); + +impl WaitTimeoutResult { + /// Returns `true` if the wait was known to have timed out. + /// + /// # Examples + /// + /// This example spawns a thread which will update the boolean value and + /// then wait 100 milliseconds before notifying the condvar. + /// + /// The main thread will wait with a timeout on the condvar and then leave + /// once the boolean has been updated and notified. + /// + /// ``` + /// use std::sync::{Arc, Condvar, Mutex}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move || { + /// let (lock, cvar) = &*pair2; + /// + /// // Let's wait 20 milliseconds before notifying the condvar. + /// thread::sleep(Duration::from_millis(20)); + /// + /// let mut started = lock.lock().unwrap(); + /// // We update the boolean value. + /// *started = true; + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// loop { + /// // Let's put a timeout on the condvar's wait. + /// let result = cvar.wait_timeout(started, Duration::from_millis(10)).unwrap(); + /// // 10 milliseconds have passed, or maybe the value changed! + /// started = result.0; + /// if *started == true { + /// // We received the notification and the value has been updated, we can leave. + /// break + /// } + /// } + /// ``` + #[must_use] + #[stable(feature = "wait_timeout", since = "1.5.0")] + pub fn timed_out(&self) -> bool { + self.0 + } +} + +/// A Condition Variable +/// +/// Condition variables represent the ability to block a thread such that it +/// consumes no CPU time while waiting for an event to occur. Condition +/// variables are typically associated with a boolean predicate (a condition) +/// and a mutex. The predicate is always verified inside of the mutex before +/// determining that a thread must block. +/// +/// Functions in this module will block the current **thread** of execution. +/// Note that any attempt to use multiple mutexes on the same condition +/// variable may result in a runtime panic. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Mutex, Condvar}; +/// use std::thread; +/// +/// let pair = Arc::new((Mutex::new(false), Condvar::new())); +/// let pair2 = Arc::clone(&pair); +/// +/// // Inside of our lock, spawn a new thread, and then wait for it to start. +/// thread::spawn(move|| { +/// let (lock, cvar) = &*pair2; +/// let mut started = lock.lock().unwrap(); +/// *started = true; +/// // We notify the condvar that the value has changed. +/// cvar.notify_one(); +/// }); +/// +/// // Wait for the thread to start up. +/// let (lock, cvar) = &*pair; +/// let mut started = lock.lock().unwrap(); +/// while !*started { +/// started = cvar.wait(started).unwrap(); +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Condvar { + inner: sys::Condvar, +} + +impl Condvar { + /// Creates a new condition variable which is ready to be waited on and + /// notified. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Condvar; + /// + /// let condvar = Condvar::new(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[rustc_const_stable(feature = "const_locks", since = "1.63.0")] + #[must_use] + #[inline] + pub const fn new() -> Condvar { + Condvar { inner: sys::Condvar::new() } + } + + /// Blocks the current thread until this condition variable receives a + /// notification. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + /// + /// Note that this function is susceptible to spurious wakeups. Condition + /// variables normally have a boolean predicate associated with them, and + /// the predicate must always be checked each time this function returns to + /// protect against spurious wakeups. + /// + /// # Errors + /// + /// This function will return an error if the mutex being waited on is + /// poisoned when this thread re-acquires the lock. For more information, + /// see information about [poisoning] on the [`Mutex`] type. + /// + /// # Panics + /// + /// This function may [`panic!`] if it is used with more than one mutex + /// over time. + /// + /// [`notify_one`]: Self::notify_one + /// [`notify_all`]: Self::notify_all + /// [poisoning]: super::Mutex#poisoning + /// [`Mutex`]: super::Mutex + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex` is `false`, we wait. + /// while !*started { + /// started = cvar.wait(started).unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> LockResult> { + let poisoned = unsafe { + let lock = mutex::guard_lock(&guard); + self.inner.wait(lock); + mutex::guard_poison(&guard).get() + }; + if poisoned { Err(PoisonError::new(guard)) } else { Ok(guard) } + } + + /// Blocks the current thread until this condition variable receives a + /// notification and the provided condition is false. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `guard`) and block the current thread. This means that any calls + /// to [`notify_one`] or [`notify_all`] which happen logically after the + /// mutex is unlocked are candidates to wake this thread up. When this + /// function call returns, the lock specified will have been re-acquired. + /// + /// # Errors + /// + /// This function will return an error if the mutex being waited on is + /// poisoned when this thread re-acquires the lock. For more information, + /// see information about [poisoning] on the [`Mutex`] type. + /// + /// [`notify_one`]: Self::notify_one + /// [`notify_all`]: Self::notify_all + /// [poisoning]: super::Mutex#poisoning + /// [`Mutex`]: super::Mutex + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(true), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut pending = lock.lock().unwrap(); + /// *pending = false; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// // As long as the value inside the `Mutex` is `true`, we wait. + /// let _guard = cvar.wait_while(lock.lock().unwrap(), |pending| { *pending }).unwrap(); + /// ``` + #[stable(feature = "wait_until", since = "1.42.0")] + pub fn wait_while<'a, T, F>( + &self, + mut guard: MutexGuard<'a, T>, + mut condition: F, + ) -> LockResult> + where + F: FnMut(&mut T) -> bool, + { + while condition(&mut *guard) { + guard = self.wait(guard)?; + } + Ok(guard) + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait`] + /// except that the thread will be blocked for roughly no longer + /// than `ms` milliseconds. This method should not be used for + /// precise timing due to anomalies such as preemption or platform + /// differences that might not cause the maximum amount of time + /// waited to be precisely `ms`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned boolean is `false` only if the timeout is known + /// to have elapsed. + /// + /// Like [`wait`], the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait`]: Self::wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex` is `false`, we wait. + /// loop { + /// let result = cvar.wait_timeout_ms(started, 10).unwrap(); + /// // 10 milliseconds have passed, or maybe the value changed! + /// started = result.0; + /// if *started == true { + /// // We received the notification and the value has been updated, we can leave. + /// break + /// } + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[deprecated(since = "1.6.0", note = "replaced by `std::sync::Condvar::wait_timeout`")] + pub fn wait_timeout_ms<'a, T>( + &self, + guard: MutexGuard<'a, T>, + ms: u32, + ) -> LockResult<(MutexGuard<'a, T>, bool)> { + let res = self.wait_timeout(guard, Duration::from_millis(ms as u64)); + poison::map_result(res, |(a, b)| (a, !b.timed_out())) + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait`] except that + /// the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that might not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. This function is susceptible to spurious wakeups. + /// Condition variables normally have a boolean predicate associated with + /// them, and the predicate must always be checked each time this function + /// returns to protect against spurious wakeups. Additionally, it is + /// typically desirable for the timeout to not exceed some duration in + /// spite of spurious wakes, thus the sleep-duration is decremented by the + /// amount slept. Alternatively, use the `wait_timeout_while` method + /// to wait with a timeout while a predicate is true. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed. + /// + /// Like [`wait`], the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait`]: Self::wait + /// [`wait_timeout_while`]: Self::wait_timeout_while + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // as long as the value inside the `Mutex` is `false`, we wait + /// loop { + /// let result = cvar.wait_timeout(started, Duration::from_millis(10)).unwrap(); + /// // 10 milliseconds have passed, or maybe the value changed! + /// started = result.0; + /// if *started == true { + /// // We received the notification and the value has been updated, we can leave. + /// break + /// } + /// } + /// ``` + #[stable(feature = "wait_timeout", since = "1.5.0")] + pub fn wait_timeout<'a, T>( + &self, + guard: MutexGuard<'a, T>, + dur: Duration, + ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> { + let (poisoned, result) = unsafe { + let lock = mutex::guard_lock(&guard); + let success = self.inner.wait_timeout(lock, dur); + (mutex::guard_poison(&guard).get(), WaitTimeoutResult(!success)) + }; + if poisoned { Err(PoisonError::new((guard, result))) } else { Ok((guard, result)) } + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. + /// + /// The semantics of this function are equivalent to [`wait_while`] except + /// that the thread will be blocked for roughly no longer than `dur`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that might not cause the maximum + /// amount of time waited to be precisely `dur`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned [`WaitTimeoutResult`] value indicates if the timeout is + /// known to have elapsed without the condition being met. + /// + /// Like [`wait_while`], the lock specified will be re-acquired when this + /// function returns, regardless of whether the timeout elapsed or not. + /// + /// [`wait_while`]: Self::wait_while + /// [`wait_timeout`]: Self::wait_timeout + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let pair = Arc::new((Mutex::new(true), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut pending = lock.lock().unwrap(); + /// *pending = false; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // wait for the thread to start up + /// let (lock, cvar) = &*pair; + /// let result = cvar.wait_timeout_while( + /// lock.lock().unwrap(), + /// Duration::from_millis(100), + /// |&mut pending| pending, + /// ).unwrap(); + /// if result.1.timed_out() { + /// // timed-out without the condition ever evaluating to false. + /// } + /// // access the locked mutex via result.0 + /// ``` + #[stable(feature = "wait_timeout_until", since = "1.42.0")] + pub fn wait_timeout_while<'a, T, F>( + &self, + mut guard: MutexGuard<'a, T>, + dur: Duration, + mut condition: F, + ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> + where + F: FnMut(&mut T) -> bool, + { + let start = Instant::now(); + loop { + if !condition(&mut *guard) { + return Ok((guard, WaitTimeoutResult(false))); + } + let timeout = match dur.checked_sub(start.elapsed()) { + Some(timeout) => timeout, + None => return Ok((guard, WaitTimeoutResult(true))), + }; + guard = self.wait_timeout(guard, timeout)?.0; + } + } + + /// Wakes up one blocked thread on this condvar. + /// + /// If there is a blocked thread on this condition variable, then it will + /// be woken up from its call to [`wait`] or [`wait_timeout`]. Calls to + /// `notify_one` are not buffered in any way. + /// + /// To wake up all threads, see [`notify_all`]. + /// + /// [`wait`]: Self::wait + /// [`wait_timeout`]: Self::wait_timeout + /// [`notify_all`]: Self::notify_all + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_one(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex` is `false`, we wait. + /// while !*started { + /// started = cvar.wait(started).unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn notify_one(&self) { + self.inner.notify_one() + } + + /// Wakes up all blocked threads on this condvar. + /// + /// This method will ensure that any current waiters on the condition + /// variable are awoken. Calls to `notify_all()` are not buffered in any + /// way. + /// + /// To wake up only one thread, see [`notify_one`]. + /// + /// [`notify_one`]: Self::notify_one + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex, Condvar}; + /// use std::thread; + /// + /// let pair = Arc::new((Mutex::new(false), Condvar::new())); + /// let pair2 = Arc::clone(&pair); + /// + /// thread::spawn(move|| { + /// let (lock, cvar) = &*pair2; + /// let mut started = lock.lock().unwrap(); + /// *started = true; + /// // We notify the condvar that the value has changed. + /// cvar.notify_all(); + /// }); + /// + /// // Wait for the thread to start up. + /// let (lock, cvar) = &*pair; + /// let mut started = lock.lock().unwrap(); + /// // As long as the value inside the `Mutex` is `false`, we wait. + /// while !*started { + /// started = cvar.wait(started).unwrap(); + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn notify_all(&self) { + self.inner.notify_all() + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Condvar").finish_non_exhaustive() + } +} + +#[stable(feature = "condvar_default", since = "1.10.0")] +impl Default for Condvar { + /// Creates a `Condvar` which is ready to be waited on and notified. + fn default() -> Condvar { + Condvar::new() + } +} diff --git a/library/std/src/sync/condvar/tests.rs b/library/std/src/sync/condvar/tests.rs new file mode 100644 index 000000000..24f467f0b --- /dev/null +++ b/library/std/src/sync/condvar/tests.rs @@ -0,0 +1,190 @@ +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::mpsc::channel; +use crate::sync::{Arc, Condvar, Mutex}; +use crate::thread; +use crate::time::Duration; + +#[test] +fn smoke() { + let c = Condvar::new(); + c.notify_one(); + c.notify_all(); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn notify_one() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + c2.notify_one(); + }); + let g = c.wait(g).unwrap(); + drop(g); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn notify_all() { + const N: usize = 10; + + let data = Arc::new((Mutex::new(0), Condvar::new())); + let (tx, rx) = channel(); + for _ in 0..N { + let data = data.clone(); + let tx = tx.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock().unwrap(); + *cnt += 1; + if *cnt == N { + tx.send(()).unwrap(); + } + while *cnt != 0 { + cnt = cond.wait(cnt).unwrap(); + } + tx.send(()).unwrap(); + }); + } + drop(tx); + + let &(ref lock, ref cond) = &*data; + rx.recv().unwrap(); + let mut cnt = lock.lock().unwrap(); + *cnt = 0; + cond.notify_all(); + drop(cnt); + + for _ in 0..N { + rx.recv().unwrap(); + } +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_while() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // Inside of our lock, spawn a new thread, and then wait for it to start. + thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + *started = true; + // We notify the condvar that the value has changed. + cvar.notify_one(); + }); + + // Wait for the thread to start up. + let &(ref lock, ref cvar) = &*pair; + let guard = cvar.wait_while(lock.lock().unwrap(), |started| !*started); + assert!(*guard.unwrap()); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock().unwrap(); + let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not timeout + if !no_timeout.timed_out() { + continue; + } + + break; + } +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_while_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true).unwrap(); + // no spurious wakeups. ensure it timed-out + assert!(wait.timed_out()); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_while_instant_satisfy() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock().unwrap(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false).unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_while_wake() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_copy = pair.clone(); + + let &(ref m, ref c) = &*pair; + let g = m.lock().unwrap(); + let _t = thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair_copy; + let mut started = lock.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + *started = true; + cvar.notify_one(); + }); + let (g2, wait) = c + .wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| !notified) + .unwrap(); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + assert!(*g2); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn wait_timeout_wake() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock().unwrap(); + + let c2 = c.clone(); + let m2 = m.clone(); + + let notified = Arc::new(AtomicBool::new(false)); + let notified_copy = notified.clone(); + + let t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + notified_copy.store(true, Ordering::SeqCst); + c2.notify_one(); + }); + let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap(); + assert!(!timeout_res.timed_out()); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not notified + if !notified.load(Ordering::SeqCst) { + t.join().unwrap(); + continue; + } + drop(g); + + t.join().unwrap(); + + break; + } +} diff --git a/library/std/src/sync/lazy_lock.rs b/library/std/src/sync/lazy_lock.rs new file mode 100644 index 000000000..535cc1c42 --- /dev/null +++ b/library/std/src/sync/lazy_lock.rs @@ -0,0 +1,121 @@ +use crate::cell::Cell; +use crate::fmt; +use crate::ops::Deref; +use crate::panic::{RefUnwindSafe, UnwindSafe}; +use crate::sync::OnceLock; + +/// A value which is initialized on the first access. +/// +/// This type is a thread-safe `Lazy`, and can be used in statics. +/// +/// # Examples +/// +/// ``` +/// #![feature(once_cell)] +/// +/// use std::collections::HashMap; +/// +/// use std::sync::LazyLock; +/// +/// static HASHMAP: LazyLock> = LazyLock::new(|| { +/// println!("initializing"); +/// let mut m = HashMap::new(); +/// m.insert(13, "Spica".to_string()); +/// m.insert(74, "Hoyten".to_string()); +/// m +/// }); +/// +/// fn main() { +/// println!("ready"); +/// std::thread::spawn(|| { +/// println!("{:?}", HASHMAP.get(&13)); +/// }).join().unwrap(); +/// println!("{:?}", HASHMAP.get(&74)); +/// +/// // Prints: +/// // ready +/// // initializing +/// // Some("Spica") +/// // Some("Hoyten") +/// } +/// ``` +#[unstable(feature = "once_cell", issue = "74465")] +pub struct LazyLock T> { + cell: OnceLock, + init: Cell>, +} + +impl LazyLock { + /// Creates a new lazy value with the given initializing + /// function. + #[unstable(feature = "once_cell", issue = "74465")] + pub const fn new(f: F) -> LazyLock { + LazyLock { cell: OnceLock::new(), init: Cell::new(Some(f)) } + } +} + +impl T> LazyLock { + /// Forces the evaluation of this lazy value and + /// returns a reference to result. This is equivalent + /// to the `Deref` impl, but is explicit. + /// + /// # Examples + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::LazyLock; + /// + /// let lazy = LazyLock::new(|| 92); + /// + /// assert_eq!(LazyLock::force(&lazy), &92); + /// assert_eq!(&*lazy, &92); + /// ``` + #[unstable(feature = "once_cell", issue = "74465")] + pub fn force(this: &LazyLock) -> &T { + this.cell.get_or_init(|| match this.init.take() { + Some(f) => f(), + None => panic!("Lazy instance has previously been poisoned"), + }) + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl T> Deref for LazyLock { + type Target = T; + fn deref(&self) -> &T { + LazyLock::force(self) + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl Default for LazyLock { + /// Creates a new lazy value using `Default` as the initializing function. + fn default() -> LazyLock { + LazyLock::new(T::default) + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl fmt::Debug for LazyLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Lazy").field("cell", &self.cell).finish_non_exhaustive() + } +} + +// We never create a `&F` from a `&LazyLock` so it is fine +// to not impl `Sync` for `F` +// we do create a `&mut Option` in `force`, but this is +// properly synchronized, so it only happens once +// so it also does not contribute to this impl. +#[unstable(feature = "once_cell", issue = "74465")] +unsafe impl Sync for LazyLock where OnceLock: Sync {} +// auto-derived `Send` impl is OK. + +#[unstable(feature = "once_cell", issue = "74465")] +impl RefUnwindSafe for LazyLock where OnceLock: RefUnwindSafe {} +#[unstable(feature = "once_cell", issue = "74465")] +impl UnwindSafe for LazyLock where OnceLock: UnwindSafe {} + +#[cfg(test)] +mod tests; diff --git a/library/std/src/sync/lazy_lock/tests.rs b/library/std/src/sync/lazy_lock/tests.rs new file mode 100644 index 000000000..f11b66bfc --- /dev/null +++ b/library/std/src/sync/lazy_lock/tests.rs @@ -0,0 +1,143 @@ +use crate::{ + cell::LazyCell, + panic, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Mutex, + }, + sync::{LazyLock, OnceLock}, + thread, +}; + +fn spawn_and_wait(f: impl FnOnce() -> R + Send + 'static) -> R { + thread::spawn(f).join().unwrap() +} + +#[test] +fn lazy_default() { + static CALLED: AtomicUsize = AtomicUsize::new(0); + + struct Foo(u8); + impl Default for Foo { + fn default() -> Self { + CALLED.fetch_add(1, SeqCst); + Foo(42) + } + } + + let lazy: LazyCell> = <_>::default(); + + assert_eq!(CALLED.load(SeqCst), 0); + + assert_eq!(lazy.lock().unwrap().0, 42); + assert_eq!(CALLED.load(SeqCst), 1); + + lazy.lock().unwrap().0 = 21; + + assert_eq!(lazy.lock().unwrap().0, 21); + assert_eq!(CALLED.load(SeqCst), 1); +} + +#[test] +fn lazy_poisoning() { + let x: LazyCell = LazyCell::new(|| panic!("kaboom")); + for _ in 0..2 { + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| x.len())); + assert!(res.is_err()); + } +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn sync_lazy_new() { + static CALLED: AtomicUsize = AtomicUsize::new(0); + static SYNC_LAZY: LazyLock = LazyLock::new(|| { + CALLED.fetch_add(1, SeqCst); + 92 + }); + + assert_eq!(CALLED.load(SeqCst), 0); + + spawn_and_wait(|| { + let y = *SYNC_LAZY - 30; + assert_eq!(y, 62); + assert_eq!(CALLED.load(SeqCst), 1); + }); + + let y = *SYNC_LAZY - 30; + assert_eq!(y, 62); + assert_eq!(CALLED.load(SeqCst), 1); +} + +#[test] +fn sync_lazy_default() { + static CALLED: AtomicUsize = AtomicUsize::new(0); + + struct Foo(u8); + impl Default for Foo { + fn default() -> Self { + CALLED.fetch_add(1, SeqCst); + Foo(42) + } + } + + let lazy: LazyLock> = <_>::default(); + + assert_eq!(CALLED.load(SeqCst), 0); + + assert_eq!(lazy.lock().unwrap().0, 42); + assert_eq!(CALLED.load(SeqCst), 1); + + lazy.lock().unwrap().0 = 21; + + assert_eq!(lazy.lock().unwrap().0, 21); + assert_eq!(CALLED.load(SeqCst), 1); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn static_sync_lazy() { + static XS: LazyLock> = LazyLock::new(|| { + let mut xs = Vec::new(); + xs.push(1); + xs.push(2); + xs.push(3); + xs + }); + + spawn_and_wait(|| { + assert_eq!(&*XS, &vec![1, 2, 3]); + }); + + assert_eq!(&*XS, &vec![1, 2, 3]); +} + +#[test] +fn static_sync_lazy_via_fn() { + fn xs() -> &'static Vec { + static XS: OnceLock> = OnceLock::new(); + XS.get_or_init(|| { + let mut xs = Vec::new(); + xs.push(1); + xs.push(2); + xs.push(3); + xs + }) + } + assert_eq!(xs(), &vec![1, 2, 3]); +} + +#[test] +fn sync_lazy_poisoning() { + let x: LazyLock = LazyLock::new(|| panic!("kaboom")); + for _ in 0..2 { + let res = panic::catch_unwind(|| x.len()); + assert!(res.is_err()); + } +} + +#[test] +fn is_sync_send() { + fn assert_traits() {} + assert_traits::>(); +} diff --git a/library/std/src/sync/mod.rs b/library/std/src/sync/mod.rs new file mode 100644 index 000000000..7b507a169 --- /dev/null +++ b/library/std/src/sync/mod.rs @@ -0,0 +1,189 @@ +//! Useful synchronization primitives. +//! +//! ## The need for synchronization +//! +//! Conceptually, a Rust program is a series of operations which will +//! be executed on a computer. The timeline of events happening in the +//! program is consistent with the order of the operations in the code. +//! +//! Consider the following code, operating on some global static variables: +//! +//! ```rust +//! static mut A: u32 = 0; +//! static mut B: u32 = 0; +//! static mut C: u32 = 0; +//! +//! fn main() { +//! unsafe { +//! A = 3; +//! B = 4; +//! A = A + B; +//! C = B; +//! println!("{A} {B} {C}"); +//! C = A; +//! } +//! } +//! ``` +//! +//! It appears as if some variables stored in memory are changed, an addition +//! is performed, result is stored in `A` and the variable `C` is +//! modified twice. +//! +//! When only a single thread is involved, the results are as expected: +//! the line `7 4 4` gets printed. +//! +//! As for what happens behind the scenes, when optimizations are enabled the +//! final generated machine code might look very different from the code: +//! +//! - The first store to `C` might be moved before the store to `A` or `B`, +//! _as if_ we had written `C = 4; A = 3; B = 4`. +//! +//! - Assignment of `A + B` to `A` might be removed, since the sum can be stored +//! in a temporary location until it gets printed, with the global variable +//! never getting updated. +//! +//! - The final result could be determined just by looking at the code +//! at compile time, so [constant folding] might turn the whole +//! block into a simple `println!("7 4 4")`. +//! +//! The compiler is allowed to perform any combination of these +//! optimizations, as long as the final optimized code, when executed, +//! produces the same results as the one without optimizations. +//! +//! Due to the [concurrency] involved in modern computers, assumptions +//! about the program's execution order are often wrong. Access to +//! global variables can lead to nondeterministic results, **even if** +//! compiler optimizations are disabled, and it is **still possible** +//! to introduce synchronization bugs. +//! +//! Note that thanks to Rust's safety guarantees, accessing global (static) +//! variables requires `unsafe` code, assuming we don't use any of the +//! synchronization primitives in this module. +//! +//! [constant folding]: https://en.wikipedia.org/wiki/Constant_folding +//! [concurrency]: https://en.wikipedia.org/wiki/Concurrency_(computer_science) +//! +//! ## Out-of-order execution +//! +//! Instructions can execute in a different order from the one we define, due to +//! various reasons: +//! +//! - The **compiler** reordering instructions: If the compiler can issue an +//! instruction at an earlier point, it will try to do so. For example, it +//! might hoist memory loads at the top of a code block, so that the CPU can +//! start [prefetching] the values from memory. +//! +//! In single-threaded scenarios, this can cause issues when writing +//! signal handlers or certain kinds of low-level code. +//! Use [compiler fences] to prevent this reordering. +//! +//! - A **single processor** executing instructions [out-of-order]: +//! Modern CPUs are capable of [superscalar] execution, +//! i.e., multiple instructions might be executing at the same time, +//! even though the machine code describes a sequential process. +//! +//! This kind of reordering is handled transparently by the CPU. +//! +//! - A **multiprocessor** system executing multiple hardware threads +//! at the same time: In multi-threaded scenarios, you can use two +//! kinds of primitives to deal with synchronization: +//! - [memory fences] to ensure memory accesses are made visible to +//! other CPUs in the right order. +//! - [atomic operations] to ensure simultaneous access to the same +//! memory location doesn't lead to undefined behavior. +//! +//! [prefetching]: https://en.wikipedia.org/wiki/Cache_prefetching +//! [compiler fences]: crate::sync::atomic::compiler_fence +//! [out-of-order]: https://en.wikipedia.org/wiki/Out-of-order_execution +//! [superscalar]: https://en.wikipedia.org/wiki/Superscalar_processor +//! [memory fences]: crate::sync::atomic::fence +//! [atomic operations]: crate::sync::atomic +//! +//! ## Higher-level synchronization objects +//! +//! Most of the low-level synchronization primitives are quite error-prone and +//! inconvenient to use, which is why the standard library also exposes some +//! higher-level synchronization objects. +//! +//! These abstractions can be built out of lower-level primitives. +//! For efficiency, the sync objects in the standard library are usually +//! implemented with help from the operating system's kernel, which is +//! able to reschedule the threads while they are blocked on acquiring +//! a lock. +//! +//! The following is an overview of the available synchronization +//! objects: +//! +//! - [`Arc`]: Atomically Reference-Counted pointer, which can be used +//! in multithreaded environments to prolong the lifetime of some +//! data until all the threads have finished using it. +//! +//! - [`Barrier`]: Ensures multiple threads will wait for each other +//! to reach a point in the program, before continuing execution all +//! together. +//! +//! - [`Condvar`]: Condition Variable, providing the ability to block +//! a thread while waiting for an event to occur. +//! +//! - [`mpsc`]: Multi-producer, single-consumer queues, used for +//! message-based communication. Can provide a lightweight +//! inter-thread synchronisation mechanism, at the cost of some +//! extra memory. +//! +//! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at +//! most one thread at a time is able to access some data. +//! +//! - [`Once`]: Used for thread-safe, one-time initialization of a +//! global variable. +//! +//! - [`RwLock`]: Provides a mutual exclusion mechanism which allows +//! multiple readers at the same time, while allowing only one +//! writer at a time. In some cases, this can be more efficient than +//! a mutex. +//! +//! [`Arc`]: crate::sync::Arc +//! [`Barrier`]: crate::sync::Barrier +//! [`Condvar`]: crate::sync::Condvar +//! [`mpsc`]: crate::sync::mpsc +//! [`Mutex`]: crate::sync::Mutex +//! [`Once`]: crate::sync::Once +//! [`RwLock`]: crate::sync::RwLock + +#![stable(feature = "rust1", since = "1.0.0")] + +#[stable(feature = "rust1", since = "1.0.0")] +pub use alloc_crate::sync::{Arc, Weak}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use core::sync::atomic; +#[unstable(feature = "exclusive_wrapper", issue = "98407")] +pub use core::sync::Exclusive; + +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::barrier::{Barrier, BarrierWaitResult}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::condvar::{Condvar, WaitTimeoutResult}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::mutex::{Mutex, MutexGuard}; +#[stable(feature = "rust1", since = "1.0.0")] +#[allow(deprecated)] +pub use self::once::{Once, OnceState, ONCE_INIT}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::poison::{LockResult, PoisonError, TryLockError, TryLockResult}; +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +#[unstable(feature = "once_cell", issue = "74465")] +pub use self::lazy_lock::LazyLock; +#[unstable(feature = "once_cell", issue = "74465")] +pub use self::once_lock::OnceLock; + +pub mod mpsc; + +mod barrier; +mod condvar; +mod lazy_lock; +mod mutex; +mod once; +mod once_lock; +mod poison; +mod rwlock; diff --git a/library/std/src/sync/mpsc/blocking.rs b/library/std/src/sync/mpsc/blocking.rs new file mode 100644 index 000000000..021df7b09 --- /dev/null +++ b/library/std/src/sync/mpsc/blocking.rs @@ -0,0 +1,82 @@ +//! Generic support for building blocking abstractions. + +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::Arc; +use crate::thread::{self, Thread}; +use crate::time::Instant; + +struct Inner { + thread: Thread, + woken: AtomicBool, +} + +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + +#[derive(Clone)] +pub struct SignalToken { + inner: Arc, +} + +pub struct WaitToken { + inner: Arc, +} + +impl !Send for WaitToken {} + +impl !Sync for WaitToken {} + +pub fn tokens() -> (WaitToken, SignalToken) { + let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) }); + let wait_token = WaitToken { inner: inner.clone() }; + let signal_token = SignalToken { inner }; + (wait_token, signal_token) +} + +impl SignalToken { + pub fn signal(&self) -> bool { + let wake = self + .inner + .woken + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok(); + if wake { + self.inner.thread.unpark(); + } + wake + } + + /// Converts to an unsafe raw pointer. Useful for storing in a pipe's state + /// flag. + #[inline] + pub unsafe fn to_raw(self) -> *mut u8 { + Arc::into_raw(self.inner) as *mut u8 + } + + /// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state + /// flag. + #[inline] + pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken { + SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) } + } +} + +impl WaitToken { + pub fn wait(self) { + while !self.inner.woken.load(Ordering::SeqCst) { + thread::park() + } + } + + /// Returns `true` if we wake up normally. + pub fn wait_max_until(self, end: Instant) -> bool { + while !self.inner.woken.load(Ordering::SeqCst) { + let now = Instant::now(); + if now >= end { + return false; + } + thread::park_timeout(end - now) + } + true + } +} diff --git a/library/std/src/sync/mpsc/cache_aligned.rs b/library/std/src/sync/mpsc/cache_aligned.rs new file mode 100644 index 000000000..9197f0d6e --- /dev/null +++ b/library/std/src/sync/mpsc/cache_aligned.rs @@ -0,0 +1,25 @@ +use crate::ops::{Deref, DerefMut}; + +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[cfg_attr(target_arch = "aarch64", repr(align(128)))] +#[cfg_attr(not(target_arch = "aarch64"), repr(align(64)))] +pub(super) struct CacheAligned(pub T); + +impl Deref for CacheAligned { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for CacheAligned { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl CacheAligned { + pub(super) fn new(t: T) -> Self { + CacheAligned(t) + } +} diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs new file mode 100644 index 000000000..e85a87239 --- /dev/null +++ b/library/std/src/sync/mpsc/mod.rs @@ -0,0 +1,1669 @@ +//! Multi-producer, single-consumer FIFO queue communication primitives. +//! +//! This module provides message-based communication over channels, concretely +//! defined among three types: +//! +//! * [`Sender`] +//! * [`SyncSender`] +//! * [`Receiver`] +//! +//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both +//! senders are clone-able (multi-producer) such that many threads can send +//! simultaneously to one receiver (single-consumer). +//! +//! These channels come in two flavors: +//! +//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function +//! will return a `(Sender, Receiver)` tuple where all sends will be +//! **asynchronous** (they never block). The channel conceptually has an +//! infinite buffer. +//! +//! 2. A synchronous, bounded channel. The [`sync_channel`] function will +//! return a `(SyncSender, Receiver)` tuple where the storage for pending +//! messages is a pre-allocated buffer of a fixed size. All sends will be +//! **synchronous** by blocking until there is buffer space available. Note +//! that a bound of 0 is allowed, causing the channel to become a "rendezvous" +//! channel where each sender atomically hands off a message to a receiver. +//! +//! [`send`]: Sender::send +//! +//! ## Disconnection +//! +//! The send and receive operations on channels will all return a [`Result`] +//! indicating whether the operation succeeded or not. An unsuccessful operation +//! is normally indicative of the other half of a channel having "hung up" by +//! being dropped in its corresponding thread. +//! +//! Once half of a channel has been deallocated, most operations can no longer +//! continue to make progress, so [`Err`] will be returned. Many applications +//! will continue to [`unwrap`] the results returned from this module, +//! instigating a propagation of failure among threads if one unexpectedly dies. +//! +//! [`unwrap`]: Result::unwrap +//! +//! # Examples +//! +//! Simple usage: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::channel; +//! +//! // Create a simple streaming channel +//! let (tx, rx) = channel(); +//! thread::spawn(move|| { +//! tx.send(10).unwrap(); +//! }); +//! assert_eq!(rx.recv().unwrap(), 10); +//! ``` +//! +//! Shared usage: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::channel; +//! +//! // Create a shared channel that can be sent along from many threads +//! // where tx is the sending half (tx for transmission), and rx is the receiving +//! // half (rx for receiving). +//! let (tx, rx) = channel(); +//! for i in 0..10 { +//! let tx = tx.clone(); +//! thread::spawn(move|| { +//! tx.send(i).unwrap(); +//! }); +//! } +//! +//! for _ in 0..10 { +//! let j = rx.recv().unwrap(); +//! assert!(0 <= j && j < 10); +//! } +//! ``` +//! +//! Propagating panics: +//! +//! ``` +//! use std::sync::mpsc::channel; +//! +//! // The call to recv() will return an error because the channel has already +//! // hung up (or been deallocated) +//! let (tx, rx) = channel::(); +//! drop(tx); +//! assert!(rx.recv().is_err()); +//! ``` +//! +//! Synchronous channels: +//! +//! ``` +//! use std::thread; +//! use std::sync::mpsc::sync_channel; +//! +//! let (tx, rx) = sync_channel::(0); +//! thread::spawn(move|| { +//! // This will wait for the parent thread to start receiving +//! tx.send(53).unwrap(); +//! }); +//! rx.recv().unwrap(); +//! ``` +//! +//! Unbounded receive loop: +//! +//! ``` +//! use std::sync::mpsc::sync_channel; +//! use std::thread; +//! +//! let (tx, rx) = sync_channel(3); +//! +//! for _ in 0..3 { +//! // It would be the same without thread and clone here +//! // since there will still be one `tx` left. +//! let tx = tx.clone(); +//! // cloned tx dropped within thread +//! thread::spawn(move || tx.send("ok").unwrap()); +//! } +//! +//! // Drop the last sender to stop `rx` waiting for message. +//! // The program will not complete if we comment this out. +//! // **All** `tx` needs to be dropped for `rx` to have `Err`. +//! drop(tx); +//! +//! // Unbounded receiver waiting for all senders to complete. +//! while let Ok(msg) = rx.recv() { +//! println!("{msg}"); +//! } +//! +//! println!("completed"); +//! ``` + +#![stable(feature = "rust1", since = "1.0.0")] + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +#[cfg(all(test, not(target_os = "emscripten")))] +mod sync_tests; + +// A description of how Rust's channel implementation works +// +// Channels are supposed to be the basic building block for all other +// concurrent primitives that are used in Rust. As a result, the channel type +// needs to be highly optimized, flexible, and broad enough for use everywhere. +// +// The choice of implementation of all channels is to be built on lock-free data +// structures. The channels themselves are then consequently also lock-free data +// structures. As always with lock-free code, this is a very "here be dragons" +// territory, especially because I'm unaware of any academic papers that have +// gone into great length about channels of these flavors. +// +// ## Flavors of channels +// +// From the perspective of a consumer of this library, there is only one flavor +// of channel. This channel can be used as a stream and cloned to allow multiple +// senders. Under the hood, however, there are actually three flavors of +// channels in play. +// +// * Flavor::Oneshots - these channels are highly optimized for the one-send use +// case. They contain as few atomics as possible and +// involve one and exactly one allocation. +// * Streams - these channels are optimized for the non-shared use case. They +// use a different concurrent queue that is more tailored for this +// use case. The initial allocation of this flavor of channel is not +// optimized. +// * Shared - this is the most general form of channel that this module offers, +// a channel with multiple senders. This type is as optimized as it +// can be, but the previous two types mentioned are much faster for +// their use-cases. +// +// ## Concurrent queues +// +// The basic idea of Rust's Sender/Receiver types is that send() never blocks, +// but recv() obviously blocks. This means that under the hood there must be +// some shared and concurrent queue holding all of the actual data. +// +// With two flavors of channels, two flavors of queues are also used. We have +// chosen to use queues from a well-known author that are abbreviated as SPSC +// and MPSC (single producer, single consumer and multiple producer, single +// consumer). SPSC queues are used for streams while MPSC queues are used for +// shared channels. +// +// ### SPSC optimizations +// +// The SPSC queue found online is essentially a linked list of nodes where one +// half of the nodes are the "queue of data" and the other half of nodes are a +// cache of unused nodes. The unused nodes are used such that an allocation is +// not required on every push() and a free doesn't need to happen on every +// pop(). +// +// As found online, however, the cache of nodes is of an infinite size. This +// means that if a channel at one point in its life had 50k items in the queue, +// then the queue will always have the capacity for 50k items. I believed that +// this was an unnecessary limitation of the implementation, so I have altered +// the queue to optionally have a bound on the cache size. +// +// By default, streams will have an unbounded SPSC queue with a small-ish cache +// size. The hope is that the cache is still large enough to have very fast +// send() operations while not too large such that millions of channels can +// coexist at once. +// +// ### MPSC optimizations +// +// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses +// a linked list under the hood to earn its unboundedness, but I have not put +// forth much effort into having a cache of nodes similar to the SPSC queue. +// +// For now, I believe that this is "ok" because shared channels are not the most +// common type, but soon we may wish to revisit this queue choice and determine +// another candidate for backend storage of shared channels. +// +// ## Overview of the Implementation +// +// Now that there's a little background on the concurrent queues used, it's +// worth going into much more detail about the channels themselves. The basic +// pseudocode for a send/recv are: +// +// +// send(t) recv() +// queue.push(t) return if queue.pop() +// if increment() == -1 deschedule { +// wakeup() if decrement() > 0 +// cancel_deschedule() +// } +// queue.pop() +// +// As mentioned before, there are no locks in this implementation, only atomic +// instructions are used. +// +// ### The internal atomic counter +// +// Every channel has a shared counter with each half to keep track of the size +// of the queue. This counter is used to abort descheduling by the receiver and +// to know when to wake up on the sending side. +// +// As seen in the pseudocode, senders will increment this count and receivers +// will decrement the count. The theory behind this is that if a sender sees a +// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, +// then it doesn't need to block. +// +// The recv() method has a beginning call to pop(), and if successful, it needs +// to decrement the count. It is a crucial implementation detail that this +// decrement does *not* happen to the shared counter. If this were the case, +// then it would be possible for the counter to be very negative when there were +// no receivers waiting, in which case the senders would have to determine when +// it was actually appropriate to wake up a receiver. +// +// Instead, the "steal count" is kept track of separately (not atomically +// because it's only used by receivers), and then the decrement() call when +// descheduling will lump in all of the recent steals into one large decrement. +// +// The implication of this is that if a sender sees a -1 count, then there's +// guaranteed to be a waiter waiting! +// +// ## Native Implementation +// +// A major goal of these channels is to work seamlessly on and off the runtime. +// All of the previous race conditions have been worded in terms of +// scheduler-isms (which is obviously not available without the runtime). +// +// For now, native usage of channels (off the runtime) will fall back onto +// mutexes/cond vars for descheduling/atomic decisions. The no-contention path +// is still entirely lock-free, the "deschedule" blocks above are surrounded by +// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a +// condition variable. +// +// ## Select +// +// Being able to support selection over channels has greatly influenced this +// design, and not only does selection need to work inside the runtime, but also +// outside the runtime. +// +// The implementation is fairly straightforward. The goal of select() is not to +// return some data, but only to return which channel can receive data without +// blocking. The implementation is essentially the entire blocking procedure +// followed by an increment as soon as its woken up. The cancellation procedure +// involves an increment and swapping out of to_wake to acquire ownership of the +// thread to unblock. +// +// Sadly this current implementation requires multiple allocations, so I have +// seen the throughput of select() be much worse than it should be. I do not +// believe that there is anything fundamental that needs to change about these +// channels, however, in order to support a more efficient select(). +// +// FIXME: Select is now removed, so these factors are ready to be cleaned up! +// +// # Conclusion +// +// And now that you've seen all the races that I found and attempted to fix, +// here's the code for you to find some more! + +use crate::cell::UnsafeCell; +use crate::error; +use crate::fmt; +use crate::mem; +use crate::sync::Arc; +use crate::time::{Duration, Instant}; + +mod blocking; +mod mpsc_queue; +mod oneshot; +mod shared; +mod spsc_queue; +mod stream; +mod sync; + +mod cache_aligned; + +/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. +/// This half can only be owned by one thread. +/// +/// Messages sent to the channel can be retrieved using [`recv`]. +/// +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send("Hello world!").unwrap(); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// send.send("Delayed for 2 seconds").unwrap(); +/// }); +/// +/// println!("{}", recv.recv().unwrap()); // Received immediately +/// println!("Waiting..."); +/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")] +pub struct Receiver { + inner: UnsafeCell>, +} + +// The receiver port can be sent from place to place, so long as it +// is not used to receive non-sendable things. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for Receiver {} + +#[stable(feature = "rust1", since = "1.0.0")] +impl !Sync for Receiver {} + +/// An iterator over messages on a [`Receiver`], created by [`iter`]. +/// +/// This iterator will block whenever [`next`] is called, +/// waiting for a new message, and [`None`] will be returned +/// when the corresponding channel has hung up. +/// +/// [`iter`]: Receiver::iter +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(Debug)] +pub struct Iter<'a, T: 'a> { + rx: &'a Receiver, +} + +/// An iterator that attempts to yield all pending values for a [`Receiver`], +/// created by [`try_iter`]. +/// +/// [`None`] will be returned when there are no pending values remaining or +/// if the corresponding channel has hung up. +/// +/// This iterator will never block the caller in order to wait for data to +/// become available. Instead, it will return [`None`]. +/// +/// [`try_iter`]: Receiver::try_iter +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (sender, receiver) = channel(); +/// +/// // Nothing is in the buffer yet +/// assert!(receiver.try_iter().next().is_none()); +/// println!("Nothing in the buffer..."); +/// +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// sender.send(2).unwrap(); +/// sender.send(3).unwrap(); +/// }); +/// +/// println!("Going to sleep..."); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// +/// for x in receiver.try_iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[stable(feature = "receiver_try_iter", since = "1.15.0")] +#[derive(Debug)] +pub struct TryIter<'a, T: 'a> { + rx: &'a Receiver, +} + +/// An owning iterator over messages on a [`Receiver`], +/// created by [`into_iter`]. +/// +/// This iterator will block whenever [`next`] +/// is called, waiting for a new message, and [`None`] will be +/// returned if the corresponding channel has hung up. +/// +/// [`into_iter`]: Receiver::into_iter +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.into_iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +#[derive(Debug)] +pub struct IntoIter { + rx: Receiver, +} + +/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be +/// owned by one thread, but it can be cloned to send to other threads. +/// +/// Messages can be sent through this channel with [`send`]. +/// +/// Note: all senders (the original and the clones) need to be dropped for the receiver +/// to stop blocking to receive messages with [`Receiver::recv`]. +/// +/// [`send`]: Sender::send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// let sender2 = sender.clone(); +/// +/// // First thread owns sender +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// }); +/// +/// // Second thread owns sender2 +/// thread::spawn(move || { +/// sender2.send(2).unwrap(); +/// }); +/// +/// let msg = receiver.recv().unwrap(); +/// let msg2 = receiver.recv().unwrap(); +/// +/// assert_eq!(3, msg + msg2); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Sender { + inner: UnsafeCell>, +} + +// The send port can be sent from place to place, so long as it +// is not used to send non-sendable things. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for Sender {} + +#[stable(feature = "rust1", since = "1.0.0")] +impl !Sync for Sender {} + +/// The sending-half of Rust's synchronous [`sync_channel`] type. +/// +/// Messages can be sent through this channel with [`send`] or [`try_send`]. +/// +/// [`send`] will block if there is no space in the internal buffer. +/// +/// [`send`]: SyncSender::send +/// [`try_send`]: SyncSender::try_send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// // Create a sync_channel with buffer size 2 +/// let (sync_sender, receiver) = sync_channel(2); +/// let sync_sender2 = sync_sender.clone(); +/// +/// // First thread owns sync_sender +/// thread::spawn(move || { +/// sync_sender.send(1).unwrap(); +/// sync_sender.send(2).unwrap(); +/// }); +/// +/// // Second thread owns sync_sender2 +/// thread::spawn(move || { +/// sync_sender2.send(3).unwrap(); +/// // thread will now block since the buffer is full +/// println!("Thread unblocked!"); +/// }); +/// +/// let mut msg; +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {msg} received"); +/// +/// // "Thread unblocked!" will be printed now +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {msg} received"); +/// +/// msg = receiver.recv().unwrap(); +/// +/// println!("message {msg} received"); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct SyncSender { + inner: Arc>, +} + +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for SyncSender {} + +/// An error returned from the [`Sender::send`] or [`SyncSender::send`] +/// function on **channel**s. +/// +/// A **send** operation can only fail if the receiving end of a channel is +/// disconnected, implying that the data could never be received. The error +/// contains the data being sent as a payload so it can be recovered. +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError(#[stable(feature = "rust1", since = "1.0.0")] pub T); + +/// An error returned from the [`recv`] function on a [`Receiver`]. +/// +/// The [`recv`] operation can only fail if the sending half of a +/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further +/// messages will ever be received. +/// +/// [`recv`]: Receiver::recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct RecvError; + +/// This enumeration is the list of the possible reasons that [`try_recv`] could +/// not return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`try_recv`]: Receiver::try_recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "rust1", since = "1.0.0")] +pub enum TryRecvError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + #[stable(feature = "rust1", since = "1.0.0")] + Empty, + + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + #[stable(feature = "rust1", since = "1.0.0")] + Disconnected, +} + +/// This enumeration is the list of possible errors that made [`recv_timeout`] +/// unable to return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`recv_timeout`]: Receiver::recv_timeout +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] +pub enum RecvTimeoutError { + /// This **channel** is currently empty, but the **Sender**(s) have not yet + /// disconnected, so data may yet become available. + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + Timeout, + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + Disconnected, +} + +/// This enumeration is the list of the possible error outcomes for the +/// [`try_send`] method. +/// +/// [`try_send`]: SyncSender::try_send +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum TrySendError { + /// The data could not be sent on the [`sync_channel`] because it would require that + /// the callee block to send the data. + /// + /// If this is a buffered channel, then the buffer is full at this time. If + /// this is not a buffered channel, then there is no [`Receiver`] available to + /// acquire the data. + #[stable(feature = "rust1", since = "1.0.0")] + Full(#[stable(feature = "rust1", since = "1.0.0")] T), + + /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be + /// sent. The data is returned back to the callee in this case. + #[stable(feature = "rust1", since = "1.0.0")] + Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T), +} + +enum Flavor { + Oneshot(Arc>), + Stream(Arc>), + Shared(Arc>), + Sync(Arc>), +} + +#[doc(hidden)] +trait UnsafeFlavor { + fn inner_unsafe(&self) -> &UnsafeCell>; + unsafe fn inner_mut(&self) -> &mut Flavor { + &mut *self.inner_unsafe().get() + } + unsafe fn inner(&self) -> &Flavor { + &*self.inner_unsafe().get() + } +} +impl UnsafeFlavor for Sender { + fn inner_unsafe(&self) -> &UnsafeCell> { + &self.inner + } +} +impl UnsafeFlavor for Receiver { + fn inner_unsafe(&self) -> &UnsafeCell> { + &self.inner + } +} + +/// Creates a new asynchronous channel, returning the sender/receiver halves. +/// All data sent on the [`Sender`] will become available on the [`Receiver`] in +/// the same order as it was sent, and no [`send`] will block the calling thread +/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will +/// block after its buffer limit is reached). [`recv`] will block until a message +/// is available while there is at least one [`Sender`] alive (including clones). +/// +/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but +/// only one [`Receiver`] is supported. +/// +/// If the [`Receiver`] is disconnected while trying to [`send`] with the +/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the +/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will +/// return a [`RecvError`]. +/// +/// [`send`]: Sender::send +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ``` +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// +/// // Spawn off an expensive computation +/// thread::spawn(move|| { +/// # fn expensive_computation() {} +/// sender.send(expensive_computation()).unwrap(); +/// }); +/// +/// // Do some useful work for awhile +/// +/// // Let's see what that answer was +/// println!("{:?}", receiver.recv().unwrap()); +/// ``` +#[must_use] +#[stable(feature = "rust1", since = "1.0.0")] +pub fn channel() -> (Sender, Receiver) { + let a = Arc::new(oneshot::Packet::new()); + (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) +} + +/// Creates a new synchronous, bounded channel. +/// All data sent on the [`SyncSender`] will become available on the [`Receiver`] +/// in the same order as it was sent. Like asynchronous [`channel`]s, the +/// [`Receiver`] will block until a message becomes available. `sync_channel` +/// differs greatly in the semantics of the sender, however. +/// +/// This channel has an internal buffer on which messages will be queued. +/// `bound` specifies the buffer size. When the internal buffer becomes full, +/// future sends will *block* waiting for the buffer to open up. Note that a +/// buffer size of 0 is valid, in which case this becomes "rendezvous channel" +/// where each [`send`] will not return until a [`recv`] is paired with it. +/// +/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple +/// times, but only one [`Receiver`] is supported. +/// +/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying +/// to [`send`] with the [`SyncSender`], the [`send`] method will return a +/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying +/// to [`recv`], the [`recv`] method will return a [`RecvError`]. +/// +/// [`send`]: SyncSender::send +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ``` +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// let (sender, receiver) = sync_channel(1); +/// +/// // this returns immediately +/// sender.send(1).unwrap(); +/// +/// thread::spawn(move|| { +/// // this will block until the previous message has been received +/// sender.send(2).unwrap(); +/// }); +/// +/// assert_eq!(receiver.recv().unwrap(), 1); +/// assert_eq!(receiver.recv().unwrap(), 2); +/// ``` +#[must_use] +#[stable(feature = "rust1", since = "1.0.0")] +pub fn sync_channel(bound: usize) -> (SyncSender, Receiver) { + let a = Arc::new(sync::Packet::new(bound)); + (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) +} + +//////////////////////////////////////////////////////////////////////////////// +// Sender +//////////////////////////////////////////////////////////////////////////////// + +impl Sender { + fn new(inner: Flavor) -> Sender { + Sender { inner: UnsafeCell::new(inner) } + } + + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// A successful send occurs when it is determined that the other end of + /// the channel has not hung up already. An unsuccessful send would be one + /// where the corresponding receiver has already been deallocated. Note + /// that a return value of [`Err`] means that the data will never be + /// received, but a return value of [`Ok`] does *not* mean that the data + /// will be received. It is possible for the corresponding receiver to + /// hang up immediately after this function returns [`Ok`]. + /// + /// This method will never block the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc::channel; + /// + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// tx.send(1).unwrap(); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert_eq!(tx.send(1).unwrap_err().0, 1); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn send(&self, t: T) -> Result<(), SendError> { + let (new_inner, ret) = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + if !p.sent() { + return p.send(t).map_err(SendError); + } else { + let a = Arc::new(stream::Packet::new()); + let rx = Receiver::new(Flavor::Stream(a.clone())); + match p.upgrade(rx) { + oneshot::UpSuccess => { + let ret = a.send(t); + (a, ret) + } + oneshot::UpDisconnected => (a, Err(t)), + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is + // asleep (we're looking at it), so the receiver + // can't go away. + a.send(t).ok().unwrap(); + token.signal(); + (a, Ok(())) + } + } + } + } + Flavor::Stream(ref p) => return p.send(t).map_err(SendError), + Flavor::Shared(ref p) => return p.send(t).map_err(SendError), + Flavor::Sync(..) => unreachable!(), + }; + + unsafe { + let tmp = Sender::new(Flavor::Stream(new_inner)); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + ret.map_err(SendError) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Clone for Sender { + /// Clone a sender to send to other threads. + /// + /// Note, be aware of the lifetime of the sender because all senders + /// (including the original) need to be dropped in order for + /// [`Receiver::recv`] to stop blocking. + fn clone(&self) -> Sender { + let packet = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => { + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); + let rx = Receiver::new(Flavor::Shared(a.clone())); + let sleeper = match p.upgrade(rx) { + oneshot::UpSuccess | oneshot::UpDisconnected => None, + oneshot::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); + } + a + } + Flavor::Stream(ref p) => { + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); + let rx = Receiver::new(Flavor::Shared(a.clone())); + let sleeper = match p.upgrade(rx) { + stream::UpSuccess | stream::UpDisconnected => None, + stream::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); + } + a + } + Flavor::Shared(ref p) => { + p.clone_chan(); + return Sender::new(Flavor::Shared(p.clone())); + } + Flavor::Sync(..) => unreachable!(), + }; + + unsafe { + let tmp = Sender::new(Flavor::Shared(packet.clone())); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + Sender::new(Flavor::Shared(packet)) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for Sender { + fn drop(&mut self) { + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_chan(), + Flavor::Stream(ref p) => p.drop_chan(), + Flavor::Shared(ref p) => p.drop_chan(), + Flavor::Sync(..) => unreachable!(), + } + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Sender").finish_non_exhaustive() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// SyncSender +//////////////////////////////////////////////////////////////////////////////// + +impl SyncSender { + fn new(inner: Arc>) -> SyncSender { + SyncSender { inner } + } + + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + /// + /// Note that a successful send does *not* guarantee that the receiver will + /// ever see the data if there is a buffer on this channel. Items may be + /// enqueued in the internal buffer for the receiver to receive at a later + /// time. If the buffer size is 0, however, the channel becomes a rendezvous + /// channel and it guarantees that the receiver has indeed received + /// the data if this function returns success. + /// + /// This function will never panic, but it may return [`Err`] if the + /// [`Receiver`] has disconnected and is no longer able to receive + /// information. + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a rendezvous sync_channel with buffer size 0 + /// let (sync_sender, receiver) = sync_channel(0); + /// + /// thread::spawn(move || { + /// println!("sending message..."); + /// sync_sender.send(1).unwrap(); + /// // Thread is now blocked until the message is received + /// + /// println!("...message received!"); + /// }); + /// + /// let msg = receiver.recv().unwrap(); + /// assert_eq!(1, msg); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn send(&self, t: T) -> Result<(), SendError> { + self.inner.send(t).map_err(SendError) + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from [`send`] by returning immediately if the + /// channel's buffer is full or no receiver is waiting to acquire some + /// data. Compared with [`send`], this function has two failure cases + /// instead of one (one for disconnection, one for a full buffer). + /// + /// See [`send`] for notes about guarantees of whether the + /// receiver has received the data or not if this function is successful. + /// + /// [`send`]: Self::send + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a sync_channel with buffer size 1 + /// let (sync_sender, receiver) = sync_channel(1); + /// let sync_sender2 = sync_sender.clone(); + /// + /// // First thread owns sync_sender + /// thread::spawn(move || { + /// sync_sender.send(1).unwrap(); + /// sync_sender.send(2).unwrap(); + /// // Thread blocked + /// }); + /// + /// // Second thread owns sync_sender2 + /// thread::spawn(move || { + /// // This will return an error and send + /// // no message if the buffer is full + /// let _ = sync_sender2.try_send(3); + /// }); + /// + /// let mut msg; + /// msg = receiver.recv().unwrap(); + /// println!("message {msg} received"); + /// + /// msg = receiver.recv().unwrap(); + /// println!("message {msg} received"); + /// + /// // Third message may have never been sent + /// match receiver.try_recv() { + /// Ok(msg) => println!("message {msg} received"), + /// Err(_) => println!("the third message was never sent"), + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_send(&self, t: T) -> Result<(), TrySendError> { + self.inner.try_send(t) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Clone for SyncSender { + fn clone(&self) -> SyncSender { + self.inner.clone_chan(); + SyncSender::new(self.inner.clone()) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for SyncSender { + fn drop(&mut self) { + self.inner.drop_chan(); + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl fmt::Debug for SyncSender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SyncSender").finish_non_exhaustive() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Receiver +//////////////////////////////////////////////////////////////////////////////// + +impl Receiver { + fn new(inner: Flavor) -> Receiver { + Receiver { inner: UnsafeCell::new(inner) } + } + + /// Attempts to return a pending value on this receiver without blocking. + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with a + /// possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with [`recv`], this function has two failure cases instead of one + /// (one for disconnection, one for an empty buffer). + /// + /// [`recv`]: Self::recv + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::{Receiver, channel}; + /// + /// let (_, receiver): (_, Receiver) = channel(); + /// + /// assert!(receiver.try_recv().is_err()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_recv(&self) -> Result { + loop { + let new_port = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(TryRecvError::Empty), + Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected), + Err(oneshot::Upgraded(rx)) => rx, + }, + Flavor::Stream(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(TryRecvError::Empty), + Err(stream::Disconnected) => return Err(TryRecvError::Disconnected), + Err(stream::Upgraded(rx)) => rx, + }, + Flavor::Shared(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(TryRecvError::Empty), + Err(shared::Disconnected) => return Err(TryRecvError::Disconnected), + }, + Flavor::Sync(ref p) => match p.try_recv() { + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(TryRecvError::Empty), + Err(sync::Disconnected) => return Err(TryRecvError::Disconnected), + }, + }; + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent (at least one sender + /// still exists). Once a message is sent to the corresponding [`Sender`] + /// (or [`SyncSender`]), this receiver will wake up and return that + /// message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc; + /// use std::thread; + /// + /// let (send, recv) = mpsc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// ``` + /// + /// Buffering behavior: + /// + /// ``` + /// use std::sync::mpsc; + /// use std::thread; + /// use std::sync::mpsc::RecvError; + /// + /// let (send, recv) = mpsc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// drop(send); + /// }); + /// + /// // wait for the thread to join so we ensure the sender is dropped + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// assert_eq!(Ok(2), recv.recv()); + /// assert_eq!(Ok(3), recv.recv()); + /// assert_eq!(Err(RecvError), recv.recv()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn recv(&self) -> Result { + loop { + let new_port = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(RecvError), + Err(oneshot::Upgraded(rx)) => rx, + Err(oneshot::Empty) => unreachable!(), + }, + Flavor::Stream(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(RecvError), + Err(stream::Upgraded(rx)) => rx, + Err(stream::Empty) => unreachable!(), + }, + Flavor::Shared(ref p) => match p.recv(None) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(RecvError), + Err(shared::Empty) => unreachable!(), + }, + Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), + }; + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if it waits more than `timeout`. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent (at least one sender + /// still exists). Once a message is sent to the corresponding [`Sender`] + /// (or [`SyncSender`]), this receiver will wake up and return that + /// message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Known Issues + /// + /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout` + /// to panic unexpectedly with the following example: + /// + /// ```no_run + /// use std::sync::mpsc::channel; + /// use std::thread; + /// use std::time::Duration; + /// + /// let (tx, rx) = channel::(); + /// + /// thread::spawn(move || { + /// let d = Duration::from_millis(10); + /// loop { + /// println!("recv"); + /// let _r = rx.recv_timeout(d); + /// } + /// }); + /// + /// thread::sleep(Duration::from_millis(100)); + /// let _c1 = tx.clone(); + /// + /// thread::sleep(Duration::from_secs(1)); + /// ``` + /// + /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364 + /// + /// # Examples + /// + /// Successfully receiving value before encountering timeout: + /// + /// ```no_run + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching timeout: + /// + /// ```no_run + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] + pub fn recv_timeout(&self, timeout: Duration) -> Result { + // Do an optimistic try_recv to avoid the performance impact of + // Instant::now() in the full-channel case. + match self.try_recv() { + Ok(result) => Ok(result), + Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected), + Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) { + Some(deadline) => self.recv_deadline(deadline), + // So far in the future that it's practically the same as waiting indefinitely. + None => self.recv().map_err(RecvTimeoutError::from), + }, + } + } + + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if `deadline` is reached. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// Successfully receiving value before reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[unstable(feature = "deadline_api", issue = "46316")] + pub fn recv_deadline(&self, deadline: Instant) -> Result { + use self::RecvTimeoutError::*; + + loop { + let port_or_empty = match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(oneshot::Disconnected) => return Err(Disconnected), + Err(oneshot::Upgraded(rx)) => Some(rx), + Err(oneshot::Empty) => None, + }, + Flavor::Stream(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(stream::Disconnected) => return Err(Disconnected), + Err(stream::Upgraded(rx)) => Some(rx), + Err(stream::Empty) => None, + }, + Flavor::Shared(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(shared::Disconnected) => return Err(Disconnected), + Err(shared::Empty) => None, + }, + Flavor::Sync(ref p) => match p.recv(Some(deadline)) { + Ok(t) => return Ok(t), + Err(sync::Disconnected) => return Err(Disconnected), + Err(sync::Empty) => None, + }, + }; + + if let Some(new_port) = port_or_empty { + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + + // If we're already passed the deadline, and we're here without + // data, return a timeout, else try again. + if Instant::now() >= deadline { + return Err(Timeout); + } + } + } + + /// Returns an iterator that will block waiting for messages, but never + /// [`panic!`]. It will return [`None`] when the channel has hung up. + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::channel; + /// use std::thread; + /// + /// let (send, recv) = channel(); + /// + /// thread::spawn(move || { + /// send.send(1).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// }); + /// + /// let mut iter = recv.iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn iter(&self) -> Iter<'_, T> { + Iter { rx: self } + } + + /// Returns an iterator that will attempt to yield all pending values. + /// It will return `None` if there are no more pending values or if the + /// channel has hung up. The iterator will never [`panic!`] or block the + /// user by waiting for values. + /// + /// # Examples + /// + /// ```no_run + /// use std::sync::mpsc::channel; + /// use std::thread; + /// use std::time::Duration; + /// + /// let (sender, receiver) = channel(); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// sender.send(1).unwrap(); + /// sender.send(2).unwrap(); + /// sender.send(3).unwrap(); + /// }); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// // block for two seconds + /// thread::sleep(Duration::from_secs(2)); + /// + /// let mut iter = receiver.try_iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[stable(feature = "receiver_try_iter", since = "1.15.0")] + pub fn try_iter(&self) -> TryIter<'_, T> { + TryIter { rx: self } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.rx.recv().ok() + } +} + +#[stable(feature = "receiver_try_iter", since = "1.15.0")] +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.rx.try_recv().ok() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl<'a, T> IntoIterator for &'a Receiver { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl Iterator for IntoIter { + type Item = T; + fn next(&mut self) -> Option { + self.rx.recv().ok() + } +} + +#[stable(feature = "receiver_into_iter", since = "1.1.0")] +impl IntoIterator for Receiver { + type Item = T; + type IntoIter = IntoIter; + + fn into_iter(self) -> IntoIter { + IntoIter { rx: self } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for Receiver { + fn drop(&mut self) { + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_port(), + Flavor::Stream(ref p) => p.drop_port(), + Flavor::Shared(ref p) => p.drop_port(), + Flavor::Sync(ref p) => p.drop_port(), + } + } +} + +#[stable(feature = "mpsc_debug", since = "1.8.0")] +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Receiver").finish_non_exhaustive() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendError").finish_non_exhaustive() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "sending on a closed channel".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for SendError { + #[allow(deprecated)] + fn description(&self) -> &str { + "sending on a closed channel" + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "Full(..)".fmt(f), + TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => "sending on a full channel".fmt(f), + TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for TrySendError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + TrySendError::Full(..) => "sending on a full channel", + TrySendError::Disconnected(..) => "sending on a closed channel", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From> for TrySendError { + /// Converts a `SendError` into a `TrySendError`. + /// + /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError`. + /// + /// No data is allocated on the heap. + fn from(err: SendError) -> TrySendError { + match err { + SendError(t) => TrySendError::Disconnected(t), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "receiving on a closed channel".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for RecvError { + #[allow(deprecated)] + fn description(&self) -> &str { + "receiving on a closed channel" + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryRecvError::Empty => "receiving on an empty channel".fmt(f), + TryRecvError::Disconnected => "receiving on a closed channel".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl error::Error for TryRecvError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From for TryRecvError { + /// Converts a `RecvError` into a `TryRecvError`. + /// + /// This conversion always returns `TryRecvError::Disconnected`. + /// + /// No data is allocated on the heap. + fn from(err: RecvError) -> TryRecvError { + match err { + RecvError => TryRecvError::Disconnected, + } + } +} + +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f), + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f), + } + } +} + +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] +impl error::Error for RecvTimeoutError { + #[allow(deprecated)] + fn description(&self) -> &str { + match *self { + RecvTimeoutError::Timeout => "timed out waiting on channel", + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", + } + } +} + +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From for RecvTimeoutError { + /// Converts a `RecvError` into a `RecvTimeoutError`. + /// + /// This conversion always returns `RecvTimeoutError::Disconnected`. + /// + /// No data is allocated on the heap. + fn from(err: RecvError) -> RecvTimeoutError { + match err { + RecvError => RecvTimeoutError::Disconnected, + } + } +} diff --git a/library/std/src/sync/mpsc/mpsc_queue.rs b/library/std/src/sync/mpsc/mpsc_queue.rs new file mode 100644 index 000000000..cdd64a5de --- /dev/null +++ b/library/std/src/sync/mpsc/mpsc_queue.rs @@ -0,0 +1,117 @@ +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between threads, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue might not be appropriate for all use-cases. + +// https://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +pub use self::PopResult::*; + +use core::cell::UnsafeCell; +use core::ptr; + +use crate::boxed::Box; +use crate::sync::atomic::{AtomicPtr, Ordering}; + +/// A result of the `pop` function. +pub enum PopResult { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +struct Node { + next: AtomicPtr>, + value: Option, +} + +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue { + head: AtomicPtr>, + tail: UnsafeCell<*mut Node>, +} + +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + +impl Node { + unsafe fn new(v: Option) -> *mut Node { + Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v }) + } +} + +impl Queue { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } + } + + /// Pushes a new value onto this queue. + pub fn push(&self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, Ordering::AcqRel); + (*prev).next.store(n, Ordering::Release); + } + } + + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option`. It is possible for this queue to be in an + /// inconsistent state where many pushes have succeeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + pub fn pop(&self) -> PopResult { + unsafe { + let tail = *self.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + + if !next.is_null() { + *self.tail.get() = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take().unwrap(); + let _: Box> = Box::from_raw(tail); + return Data(ret); + } + + if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent } + } + } +} + +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + let mut cur = *self.tail.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + let _: Box> = Box::from_raw(cur); + cur = next; + } + } + } +} diff --git a/library/std/src/sync/mpsc/mpsc_queue/tests.rs b/library/std/src/sync/mpsc/mpsc_queue/tests.rs new file mode 100644 index 000000000..9f4f31ed0 --- /dev/null +++ b/library/std/src/sync/mpsc/mpsc_queue/tests.rs @@ -0,0 +1,47 @@ +use super::{Data, Empty, Inconsistent, Queue}; +use crate::sync::mpsc::channel; +use crate::sync::Arc; +use crate::thread; + +#[test] +fn test_full() { + let q: Queue> = Queue::new(); + q.push(Box::new(1)); + q.push(Box::new(2)); +} + +#[test] +fn test() { + let nthreads = 8; + let nmsgs = 1000; + let q = Queue::new(); + match q.pop() { + Empty => {} + Inconsistent | Data(..) => panic!(), + } + let (tx, rx) = channel(); + let q = Arc::new(q); + + for _ in 0..nthreads { + let tx = tx.clone(); + let q = q.clone(); + thread::spawn(move || { + for i in 0..nmsgs { + q.push(i); + } + tx.send(()).unwrap(); + }); + } + + let mut i = 0; + while i < nthreads * nmsgs { + match q.pop() { + Empty | Inconsistent => {} + Data(_) => i += 1, + } + } + drop(tx); + for _ in 0..nthreads { + rx.recv().unwrap(); + } +} diff --git a/library/std/src/sync/mpsc/oneshot.rs b/library/std/src/sync/mpsc/oneshot.rs new file mode 100644 index 000000000..0e259b8ae --- /dev/null +++ b/library/std/src/sync/mpsc/oneshot.rs @@ -0,0 +1,315 @@ +/// Oneshot channels/ports +/// +/// This is the initial flavor of channels/ports used for comm module. This is +/// an optimization for the one-use case of a channel. The major optimization of +/// this type is to have one and exactly one allocation when the chan/port pair +/// is created. +/// +/// Another possible optimization would be to not use an Arc box because +/// in theory we know when the shared packet can be deallocated (no real need +/// for the atomic reference counting), but I was having trouble how to destroy +/// the data early in a drop of a Port. +/// +/// # Implementation +/// +/// Oneshots are implemented around one atomic usize variable. This variable +/// indicates both the state of the port/chan but also contains any threads +/// blocked on the port. All atomic operations happen on this one word. +/// +/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect +/// on behalf of the channel side of things (it can be mentally thought of as +/// consuming the port). This upgrade is then also stored in the shared packet. +/// The one caveat to consider is that when a port sees a disconnected channel +/// it must check for data because there is no "data plus upgrade" state. +pub use self::Failure::*; +use self::MyUpgrade::*; +pub use self::UpgradeResult::*; + +use crate::cell::UnsafeCell; +use crate::ptr; +use crate::sync::atomic::{AtomicPtr, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::Receiver; +use crate::time::Instant; + +// Various states you can find a port in. +const EMPTY: *mut u8 = ptr::invalid_mut::(0); // initial state: no data, no blocked receiver +const DATA: *mut u8 = ptr::invalid_mut::(1); // data ready for receiver to take +const DISCONNECTED: *mut u8 = ptr::invalid_mut::(2); // channel is disconnected OR upgraded +// Any other value represents a pointer to a SignalToken value. The +// protocol ensures that when the state moves *to* a pointer, +// ownership of the token is given to the packet, and when the state +// moves *from* a pointer, ownership of the token is transferred to +// whoever changed the state. + +pub struct Packet { + // Internal state of the chan/port pair (stores the blocked thread as well) + state: AtomicPtr, + // One-shot data slot location + data: UnsafeCell>, + // when used for the second time, a oneshot channel must be upgraded, and + // this contains the slot for the upgrade + upgrade: UnsafeCell>, +} + +pub enum Failure { + Empty, + Disconnected, + Upgraded(Receiver), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(SignalToken), +} + +enum MyUpgrade { + NothingSent, + SendUsed, + GoUp(Receiver), +} + +impl Packet { + pub fn new() -> Packet { + Packet { + data: UnsafeCell::new(None), + upgrade: UnsafeCell::new(NothingSent), + state: AtomicPtr::new(EMPTY), + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + unsafe { + // Sanity check + match *self.upgrade.get() { + NothingSent => {} + _ => panic!("sending on a oneshot that's already sent on "), + } + assert!((*self.data.get()).is_none()); + ptr::write(self.data.get(), Some(t)); + ptr::write(self.upgrade.get(), SendUsed); + + match self.state.swap(DATA, Ordering::SeqCst) { + // Sent the data, no one was waiting + EMPTY => Ok(()), + + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. + DISCONNECTED => { + self.state.swap(DISCONNECTED, Ordering::SeqCst); + ptr::write(self.upgrade.get(), NothingSent); + Err((&mut *self.data.get()).take().unwrap()) + } + + // Not possible, these are one-use channels + DATA => unreachable!(), + + // There is a thread waiting on the other end. We leave the 'DATA' + // state inside so it'll pick it up on the other end. + ptr => { + SignalToken::from_raw(ptr).signal(); + Ok(()) + } + } + } + } + + // Just tests whether this channel has been sent on or not, this is only + // safe to use from the sender. + pub fn sent(&self) -> bool { + unsafe { !matches!(*self.upgrade.get(), NothingSent) } + } + + pub fn recv(&self, deadline: Option) -> Result> { + // Attempt to not block the thread (it's a little expensive). If it looks + // like we're not empty, then immediately go through to `try_recv`. + if self.state.load(Ordering::SeqCst) == EMPTY { + let (wait_token, signal_token) = blocking::tokens(); + let ptr = unsafe { signal_token.to_raw() }; + + // race with senders to enter the blocking state + if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() { + if let Some(deadline) = deadline { + let timed_out = !wait_token.wait_max_until(deadline); + // Try to reset the state + if timed_out { + self.abort_selection().map_err(Upgraded)?; + } + } else { + wait_token.wait(); + debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY); + } + } else { + // drop the signal token, since we never blocked + drop(unsafe { SignalToken::from_raw(ptr) }); + } + } + + self.try_recv() + } + + pub fn try_recv(&self) -> Result> { + unsafe { + match self.state.load(Ordering::SeqCst) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + let _ = self.state.compare_exchange( + DATA, + EMPTY, + Ordering::SeqCst, + Ordering::SeqCst, + ); + match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => unreachable!(), + } + } + + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => match (&mut *self.data.get()).take() { + Some(data) => Ok(data), + None => match ptr::replace(self.upgrade.get(), SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)), + }, + }, + + // We are the sole receiver; there cannot be a blocking + // receiver already. + _ => unreachable!(), + } + } + } + + // Returns whether the upgrade was completed. If the upgrade wasn't + // completed, then the port couldn't get sent to the other half (it will + // never receive it). + pub fn upgrade(&self, up: Receiver) -> UpgradeResult { + unsafe { + let prev = match *self.upgrade.get() { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => panic!("upgrading again"), + }; + ptr::write(self.upgrade.get(), GoUp(up)); + + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { + ptr::replace(self.upgrade.get(), prev); + UpDisconnected + } + + // If someone's waiting, we gotta wake them up + ptr => UpWoke(SignalToken::from_raw(ptr)), + } + } + } + + pub fn drop_chan(&self) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + DATA | DISCONNECTED | EMPTY => {} + + // If someone's waiting, we gotta wake them up + ptr => unsafe { + SignalToken::from_raw(ptr).signal(); + }, + } + } + + pub fn drop_port(&self) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // An empty channel has nothing to do, and a remotely disconnected + // channel also has nothing to do b/c we're about to run the drop + // glue + DISCONNECTED | EMPTY => {} + + // There's data on the channel, so make sure we destroy it promptly. + // This is why not using an arc is a little difficult (need the box + // to stay valid while we take the data). + DATA => unsafe { + (&mut *self.data.get()).take().unwrap(); + }, + + // We're the only ones that can block on this port + _ => unreachable!(), + } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Remove a previous selecting thread from this port. This ensures that the + // blocked thread will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&self) -> Result> { + let state = match self.state.load(Ordering::SeqCst) { + // Each of these states means that no further activity will happen + // with regard to abortion selection + s @ (EMPTY | DATA | DISCONNECTED) => s, + + // If we've got a blocked thread, then use an atomic to gain ownership + // of it (may fail) + ptr => self + .state + .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst) + .unwrap_or_else(|x| x), + }; + + // Now that we've got ownership of our state, figure out what to do + // about it. + match state { + EMPTY => unreachable!(), + // our thread used for select was stolen + DATA => Ok(true), + + // If the other end has hung up, then we have complete ownership + // of the port. First, check if there was data waiting for us. This + // is possible if the other end sent something and then hung up. + // + // We then need to check to see if there was an upgrade requested, + // and if so, the upgraded port needs to have its selection aborted. + DISCONNECTED => unsafe { + if (*self.data.get()).is_some() { + Ok(true) + } else { + match ptr::replace(self.upgrade.get(), SendUsed) { + GoUp(port) => Err(port), + _ => Ok(true), + } + } + }, + + // We woke ourselves up from select. + ptr => unsafe { + drop(SignalToken::from_raw(ptr)); + Ok(false) + }, + } + } +} + +impl Drop for Packet { + fn drop(&mut self) { + assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); + } +} diff --git a/library/std/src/sync/mpsc/shared.rs b/library/std/src/sync/mpsc/shared.rs new file mode 100644 index 000000000..51917bd96 --- /dev/null +++ b/library/std/src/sync/mpsc/shared.rs @@ -0,0 +1,501 @@ +/// Shared channels. +/// +/// This is the flavor of channels which are not necessarily optimized for any +/// particular use case, but are the most general in how they are used. Shared +/// channels are cloneable allowing for multiple senders. +/// +/// High level implementation details can be found in the comment of the parent +/// module. You'll also note that the implementation of the shared and stream +/// channels are quite similar, and this is no coincidence! +pub use self::Failure::*; +use self::StartResult::*; + +use core::cmp; +use core::intrinsics::abort; + +use crate::cell::UnsafeCell; +use crate::ptr; +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::mpsc_queue as mpsc; +use crate::sync::{Mutex, MutexGuard}; +use crate::thread; +use crate::time::Instant; + +const DISCONNECTED: isize = isize::MIN; +const FUDGE: isize = 1024; +const MAX_REFCOUNT: usize = (isize::MAX) as usize; +#[cfg(test)] +const MAX_STEALS: isize = 5; +#[cfg(not(test))] +const MAX_STEALS: isize = 1 << 20; +const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver + +pub struct Packet { + queue: mpsc::Queue, + cnt: AtomicIsize, // How many items are on this channel + steals: UnsafeCell, // How many times has a port received without blocking? + to_wake: AtomicPtr, // SignalToken for wake up + + // The number of channels which are currently using this packet. + channels: AtomicUsize, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + port_dropped: AtomicBool, + sender_drain: AtomicIsize, + + // this lock protects various portions of this implementation during + // select() + select_lock: Mutex<()>, +} + +pub enum Failure { + Empty, + Disconnected, +} + +#[derive(PartialEq, Eq)] +enum StartResult { + Installed, + Abort, +} + +impl Packet { + // Creation of a packet *must* be followed by a call to postinit_lock + // and later by inherit_blocker + pub fn new() -> Packet { + Packet { + queue: mpsc::Queue::new(), + cnt: AtomicIsize::new(0), + steals: UnsafeCell::new(0), + to_wake: AtomicPtr::new(EMPTY), + channels: AtomicUsize::new(2), + port_dropped: AtomicBool::new(false), + sender_drain: AtomicIsize::new(0), + select_lock: Mutex::new(()), + } + } + + // This function should be used after newly created Packet + // was wrapped with an Arc + // In other case mutex data will be duplicated while cloning + // and that could cause problems on platforms where it is + // represented by opaque data structure + pub fn postinit_lock(&self) -> MutexGuard<'_, ()> { + self.select_lock.lock().unwrap() + } + + // This function is used at the creation of a shared packet to inherit a + // previously blocked thread. This is done to prevent spurious wakeups of + // threads in select(). + // + // This can only be called at channel-creation time + pub fn inherit_blocker(&self, token: Option, guard: MutexGuard<'_, ()>) { + if let Some(token) = token { + assert_eq!(self.cnt.load(Ordering::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); + self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst); + self.cnt.store(-1, Ordering::SeqCst); + + // This store is a little sketchy. What's happening here is that + // we're transferring a blocker from a oneshot or stream channel to + // this shared channel. In doing so, we never spuriously wake them + // up and rather only wake them up at the appropriate time. This + // implementation of shared channels assumes that any blocking + // recv() will undo the increment of steals performed in try_recv() + // once the recv is complete. This thread that we're inheriting, + // however, is not in the middle of recv. Hence, the first time we + // wake them up, they're going to wake up from their old port, move + // on to the upgraded port, and then call the block recv() function. + // + // When calling this function, they'll find there's data immediately + // available, counting it as a steal. This in fact wasn't a steal + // because we appropriately blocked them waiting for data. + // + // To offset this bad increment, we initially set the steal count to + // -1. You'll find some special code in abort_selection() as well to + // ensure that this -1 steal count doesn't escape too far. + unsafe { + *self.steals.get() = -1; + } + } + + // When the shared packet is constructed, we grabbed this lock. The + // purpose of this lock is to ensure that abort_selection() doesn't + // interfere with this method. After we unlock this lock, we're + // signifying that we're done modifying self.cnt and self.to_wake and + // the port is ready for the world to continue using it. + drop(guard); + } + + pub fn send(&self, t: T) -> Result<(), T> { + // See Port::drop for what's going on + if self.port_dropped.load(Ordering::SeqCst) { + return Err(t); + } + + // Note that the multiple sender case is a little trickier + // semantically than the single sender case. The logic for + // incrementing is "add and if disconnected store disconnected". + // This could end up leading some senders to believe that there + // wasn't a disconnect if in fact there was a disconnect. This means + // that while one thread is attempting to re-store the disconnected + // states, other threads could walk through merrily incrementing + // this very-negative disconnected count. To prevent senders from + // spuriously attempting to send when the channels is actually + // disconnected, the count has a ranged check here. + // + // This is also done for another reason. Remember that the return + // value of this function is: + // + // `true` == the data *may* be received, this essentially has no + // meaning + // `false` == the data will *never* be received, this has a lot of + // meaning + // + // In the SPSC case, we have a check of 'queue.is_empty()' to see + // whether the data was actually received, but this same condition + // means nothing in a multi-producer context. As a result, this + // preflight check serves as the definitive "this will never be + // received". Once we get beyond this check, we have permanently + // entered the realm of "this may be received" + if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE { + return Err(t); + } + + self.queue.push(t); + match self.cnt.fetch_add(1, Ordering::SeqCst) { + -1 => { + self.take_to_wake().signal(); + } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + FUDGE => { + // see the comment in 'try' for a shared channel for why this + // window of "not disconnected" is ok. + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + + if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 { + loop { + // drain the queue, for info on the thread yield see the + // discussion in try_recv + loop { + match self.queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 { + break; + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. That sender in question will drain its own data. + } + } + + // Can't make any assumptions about this case like in the SPSC case. + _ => {} + } + + Ok(()) + } + + pub fn recv(&self, deadline: Option) -> Result { + // This code is essentially the exact same as that found in the stream + // case (see stream.rs) + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + let (wait_token, signal_token) = blocking::tokens(); + if self.decrement(signal_token) == Installed { + if let Some(deadline) = deadline { + let timed_out = !wait_token.wait_max_until(deadline); + if timed_out { + self.abort_selection(false); + } + } else { + wait_token.wait(); + } + } + + match self.try_recv() { + data @ Ok(..) => unsafe { + *self.steals.get() -= 1; + data + }, + data => data, + } + } + + // Essentially the exact same thing as the stream decrement function. + // Returns true if blocking should proceed. + fn decrement(&self, token: SignalToken) -> StartResult { + unsafe { + assert_eq!( + self.to_wake.load(Ordering::SeqCst), + EMPTY, + "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364" + ); + let ptr = token.to_raw(); + self.to_wake.store(ptr, Ordering::SeqCst); + + let steals = ptr::replace(self.steals.get(), 0); + + match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { + return Installed; + } + } + } + + self.to_wake.store(EMPTY, Ordering::SeqCst); + drop(SignalToken::from_raw(ptr)); + Abort + } + } + + pub fn try_recv(&self) -> Result { + let ret = match self.queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + + // This is a bit of an interesting case. The channel is reported as + // having data available, but our pop() has failed due to the queue + // being in an inconsistent state. This means that there is some + // pusher somewhere which has yet to complete, but we are guaranteed + // that a pop will eventually succeed. In this case, we spin in a + // yield loop because the remote sender should finish their enqueue + // operation "very quickly". + // + // Avoiding this yield loop would require a different queue + // abstraction which provides the guarantee that after M pushes have + // succeeded, at least M pops will succeed. The current queues + // guarantee that if there are N active pushes, you can pop N times + // once all N have finished. + mpsc::Inconsistent => { + let data; + loop { + thread::yield_now(); + match self.queue.pop() { + mpsc::Data(t) => { + data = t; + break; + } + mpsc::Empty => panic!("inconsistent => empty"), + mpsc::Inconsistent => {} + } + } + Some(data) + } + }; + match ret { + // See the discussion in the stream implementation for why we + // might decrement steals. + Some(data) => unsafe { + if *self.steals.get() > MAX_STEALS { + match self.cnt.swap(0, Ordering::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + } + n => { + let m = cmp::min(n, *self.steals.get()); + *self.steals.get() -= m; + self.bump(n - m); + } + } + assert!(*self.steals.get() >= 0); + } + *self.steals.get() += 1; + Ok(data) + }, + + // See the discussion in the stream implementation for why we try + // again. + None => { + match self.cnt.load(Ordering::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + _ => { + match self.queue.pop() { + mpsc::Data(t) => Ok(t), + mpsc::Empty => Err(Disconnected), + // with no senders, an inconsistency is impossible. + mpsc::Inconsistent => unreachable!(), + } + } + } + } + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&self) { + let old_count = self.channels.fetch_add(1, Ordering::SeqCst); + + // See comments on Arc::clone() on why we do this (for `mem::forget`). + if old_count > MAX_REFCOUNT { + abort(); + } + } + + // Decrement the reference count on a channel. This is called whenever a + // Chan is dropped and may end up waking up a receiver. It's the receiver's + // responsibility on the other end to figure out that we've disconnected. + pub fn drop_chan(&self) { + match self.channels.fetch_sub(1, Ordering::SeqCst) { + 1 => {} + n if n > 1 => return, + n => panic!("bad number of channels left {n}"), + } + + match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { + -1 => { + self.take_to_wake().signal(); + } + DISCONNECTED => {} + n => { + assert!(n >= 0); + } + } + } + + // See the long discussion inside of stream.rs for why the queue is drained, + // and why it is done in this fashion. + pub fn drop_port(&self) { + self.port_dropped.store(true, Ordering::SeqCst); + let mut steals = unsafe { *self.steals.get() }; + while { + match self.cnt.compare_exchange( + steals, + DISCONNECTED, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => false, + Err(old) => old != DISCONNECTED, + } + } { + // See the discussion in 'try_recv' for why we yield + // control of this thread. + loop { + match self.queue.pop() { + mpsc::Data(..) => { + steals += 1; + } + mpsc::Empty | mpsc::Inconsistent => break, + } + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&self) -> SignalToken { + let ptr = self.to_wake.load(Ordering::SeqCst); + self.to_wake.store(EMPTY, Ordering::SeqCst); + assert!(ptr != EMPTY); + unsafe { SignalToken::from_raw(ptr) } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // increment the count on the channel (used for selection) + fn bump(&self, amt: isize) -> isize { + match self.cnt.fetch_add(amt, Ordering::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, Ordering::SeqCst); + DISCONNECTED + } + n => n, + } + } + + // Cancels a previous thread waiting on this port, returning whether there's + // data on the port. + // + // This is similar to the stream implementation (hence fewer comments), but + // uses a different value for the "steals" variable. + pub fn abort_selection(&self, _was_upgrade: bool) -> bool { + // Before we do anything else, we bounce on this lock. The reason for + // doing this is to ensure that any upgrade-in-progress is gone and + // done with. Without this bounce, we can race with inherit_blocker + // about looking at and dealing with to_wake. Once we have acquired the + // lock, we are guaranteed that inherit_blocker is done. + { + let _guard = self.select_lock.lock().unwrap(); + } + + // Like the stream implementation, we want to make sure that the count + // on the channel goes non-negative. We don't know how negative the + // stream currently is, so instead of using a steal value of 1, we load + // the channel count and figure out what we should do to make it + // positive. + let steals = { + let cnt = self.cnt.load(Ordering::SeqCst); + if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 } + }; + let prev = self.bump(steals + 1); + + if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); + true + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + if prev < 0 { + drop(self.take_to_wake()); + } else { + while self.to_wake.load(Ordering::SeqCst) != EMPTY { + thread::yield_now(); + } + } + unsafe { + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + let old = self.steals.get(); + assert!(*old == 0 || *old == -1); + *old = steals; + prev >= 0 + } + } + } +} + +impl Drop for Packet { + fn drop(&mut self) { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); + assert_eq!(self.channels.load(Ordering::SeqCst), 0); + } +} diff --git a/library/std/src/sync/mpsc/spsc_queue.rs b/library/std/src/sync/mpsc/spsc_queue.rs new file mode 100644 index 000000000..7e745eb31 --- /dev/null +++ b/library/std/src/sync/mpsc/spsc_queue.rs @@ -0,0 +1,236 @@ +//! A single-producer single-consumer concurrent queue +//! +//! This module contains the implementation of an SPSC queue which can be used +//! concurrently between two threads. This data structure is safe to use and +//! enforces the semantics that there is one pusher and one popper. + +// https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +use core::cell::UnsafeCell; +use core::ptr; + +use crate::boxed::Box; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; + +use super::cache_aligned::CacheAligned; + +// Node within the linked list queue of messages to send +struct Node { + // FIXME: this could be an uninitialized T if we're careful enough, and + // that would reduce memory usage (and be a bit faster). + // is it worth it? + value: Option, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache + next: AtomicPtr>, // next node in the queue +} + +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an Arc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue { + // consumer fields + consumer: CacheAligned>, + + // producer fields + producer: CacheAligned>, +} + +struct Consumer { + tail: UnsafeCell<*mut Node>, // where to pop from + tail_prev: AtomicPtr>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cacheable + addition: Addition, +} + +struct Producer { + head: UnsafeCell<*mut Node>, // where to push to + first: UnsafeCell<*mut Node>, // where to get new nodes from + tail_copy: UnsafeCell<*mut Node>, // between first/tail + addition: Addition, +} + +unsafe impl Send for Queue {} + +unsafe impl Sync for Queue {} + +impl Node { + fn new() -> *mut Node { + Box::into_raw(box Node { + value: None, + cached: false, + next: AtomicPtr::new(ptr::null_mut::>()), + }) + } +} + +impl Queue { + /// Creates a new queue. With given additional elements in the producer and + /// consumer portions of the queue. + /// + /// Due to the performance implications of cache-contention, + /// we wish to keep fields used mainly by the producer on a separate cache + /// line than those used by the consumer. + /// Since cache lines are usually 64 bytes, it is unreasonably expensive to + /// allocate one for small fields, so we allow users to insert additional + /// fields into the cache lines already allocated by this for the producer + /// and consumer. + /// + /// This is unsafe as the type system doesn't enforce a single + /// consumer-producer relationship. It also allows the consumer to `pop` + /// items while there is a `peek` active due to all methods having a + /// non-mutable receiver. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub unsafe fn with_additions( + bound: usize, + producer_addition: ProducerAddition, + consumer_addition: ConsumerAddition, + ) -> Self { + let n1 = Node::new(); + let n2 = Node::new(); + (*n1).next.store(n2, Ordering::Relaxed); + Queue { + consumer: CacheAligned::new(Consumer { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + cache_bound: bound, + cached_nodes: AtomicUsize::new(0), + addition: consumer_addition, + }), + producer: CacheAligned::new(Producer { + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + addition: producer_addition, + }), + } + } + + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(ptr::null_mut(), Ordering::Relaxed); + (**self.producer.head.get()).next.store(n, Ordering::Release); + *(&self.producer.head).get() = n; + } + } + + unsafe fn alloc(&self) -> *mut Node { + // First try to see if we can consume the 'first' node for our uses. + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); + return ret; + } + // If the above fails, then update our copy of the tail and try + // again. + *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); + return ret; + } + // If all of that fails, then we have to allocate a new node + // (there's nothing in the node cache). + Node::new() + } + + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&self) -> Option { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = *self.consumer.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + if next.is_null() { + return None; + } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); + + *self.consumer.0.tail.get() = next; + if self.consumer.cache_bound == 0 { + self.consumer.tail_prev.store(tail, Ordering::Release); + } else { + let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); + if cached_nodes < self.consumer.cache_bound && !(*tail).cached { + self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); + (*tail).cached = true; + } + + if (*tail).cached { + self.consumer.tail_prev.store(tail, Ordering::Release); + } else { + (*self.consumer.tail_prev.load(Ordering::Relaxed)) + .next + .store(next, Ordering::Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: Box> = Box::from_raw(tail); + } + } + ret + } + } + + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// has no data currently + /// + /// # Warning + /// The reference returned is invalid if it is not used before the consumer + /// pops the value off the queue. If the producer then pushes another value + /// onto the queue, it will overwrite the value pointed to by the reference. + pub fn peek(&self) -> Option<&mut T> { + // This is essentially the same as above with all the popping bits + // stripped out. + unsafe { + let tail = *self.consumer.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + if next.is_null() { None } else { (*next).value.as_mut() } + } + } + + pub fn producer_addition(&self) -> &ProducerAddition { + &self.producer.addition + } + + pub fn consumer_addition(&self) -> &ConsumerAddition { + &self.consumer.addition + } +} + +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + let mut cur = *self.producer.first.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + let _n: Box> = Box::from_raw(cur); + cur = next; + } + } + } +} diff --git a/library/std/src/sync/mpsc/spsc_queue/tests.rs b/library/std/src/sync/mpsc/spsc_queue/tests.rs new file mode 100644 index 000000000..467ef3dbd --- /dev/null +++ b/library/std/src/sync/mpsc/spsc_queue/tests.rs @@ -0,0 +1,101 @@ +use super::Queue; +use crate::sync::mpsc::channel; +use crate::sync::Arc; +use crate::thread; + +#[test] +fn smoke() { + unsafe { + let queue = Queue::with_additions(0, (), ()); + queue.push(1); + queue.push(2); + assert_eq!(queue.pop(), Some(1)); + assert_eq!(queue.pop(), Some(2)); + assert_eq!(queue.pop(), None); + queue.push(3); + queue.push(4); + assert_eq!(queue.pop(), Some(3)); + assert_eq!(queue.pop(), Some(4)); + assert_eq!(queue.pop(), None); + } +} + +#[test] +fn peek() { + unsafe { + let queue = Queue::with_additions(0, (), ()); + queue.push(vec![1]); + + // Ensure the borrowchecker works + match queue.peek() { + Some(vec) => { + assert_eq!(&*vec, &[1]); + } + None => unreachable!(), + } + + match queue.pop() { + Some(vec) => { + assert_eq!(&*vec, &[1]); + } + None => unreachable!(), + } + } +} + +#[test] +fn drop_full() { + unsafe { + let q: Queue> = Queue::with_additions(0, (), ()); + q.push(Box::new(1)); + q.push(Box::new(2)); + } +} + +#[test] +fn smoke_bound() { + unsafe { + let q = Queue::with_additions(0, (), ()); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); + } +} + +#[test] +fn stress() { + unsafe { + stress_bound(0); + stress_bound(1); + } + + unsafe fn stress_bound(bound: usize) { + let q = Arc::new(Queue::with_additions(bound, (), ())); + + let (tx, rx) = channel(); + let q2 = q.clone(); + let _t = thread::spawn(move || { + for _ in 0..100000 { + loop { + match q2.pop() { + Some(1) => break, + Some(_) => panic!(), + None => {} + } + } + } + tx.send(()).unwrap(); + }); + for _ in 0..100000 { + q.push(1); + } + rx.recv().unwrap(); + } +} diff --git a/library/std/src/sync/mpsc/stream.rs b/library/std/src/sync/mpsc/stream.rs new file mode 100644 index 000000000..4c3812c79 --- /dev/null +++ b/library/std/src/sync/mpsc/stream.rs @@ -0,0 +1,457 @@ +/// Stream channels +/// +/// This is the flavor of channels which are optimized for one sender and one +/// receiver. The sender will be upgraded to a shared channel if the channel is +/// cloned. +/// +/// High level implementation details can be found in the comment of the parent +/// module. +pub use self::Failure::*; +use self::Message::*; +pub use self::UpgradeResult::*; + +use core::cmp; + +use crate::cell::UnsafeCell; +use crate::ptr; +use crate::thread; +use crate::time::Instant; + +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken}; +use crate::sync::mpsc::spsc_queue as spsc; +use crate::sync::mpsc::Receiver; + +const DISCONNECTED: isize = isize::MIN; +#[cfg(test)] +const MAX_STEALS: isize = 5; +#[cfg(not(test))] +const MAX_STEALS: isize = 1 << 20; +const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver + +pub struct Packet { + // internal queue for all messages + queue: spsc::Queue, ProducerAddition, ConsumerAddition>, +} + +struct ProducerAddition { + cnt: AtomicIsize, // How many items are on this channel + to_wake: AtomicPtr, // SignalToken for the blocked thread to wake up + + port_dropped: AtomicBool, // flag if the channel has been destroyed. +} + +struct ConsumerAddition { + steals: UnsafeCell, // How many times has a port received without blocking? +} + +pub enum Failure { + Empty, + Disconnected, + Upgraded(Receiver), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(SignalToken), +} + +// Any message could contain an "upgrade request" to a new shared port, so the +// internal queue it's a queue of T, but rather Message +enum Message { + Data(T), + GoUp(Receiver), +} + +impl Packet { + pub fn new() -> Packet { + Packet { + queue: unsafe { + spsc::Queue::with_additions( + 128, + ProducerAddition { + cnt: AtomicIsize::new(0), + to_wake: AtomicPtr::new(EMPTY), + + port_dropped: AtomicBool::new(false), + }, + ConsumerAddition { steals: UnsafeCell::new(0) }, + ) + }, + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + // If the other port has deterministically gone away, then definitely + // must return the data back up the stack. Otherwise, the data is + // considered as being sent. + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return Err(t); + } + + match self.do_send(Data(t)) { + UpSuccess | UpDisconnected => {} + UpWoke(token) => { + token.signal(); + } + } + Ok(()) + } + + pub fn upgrade(&self, up: Receiver) -> UpgradeResult { + // If the port has gone away, then there's no need to proceed any + // further. + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return UpDisconnected; + } + + self.do_send(GoUp(up)) + } + + fn do_send(&self, t: Message) -> UpgradeResult { + self.queue.push(t); + match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) { + // As described in the mod's doc comment, -1 == wakeup + -1 => UpWoke(self.take_to_wake()), + // As as described before, SPSC queues must be >= -2 + -2 => UpSuccess, + + // Be sure to preserve the disconnected state, and the return value + // in this case is going to be whether our data was received or not. + // This manifests itself on whether we have an empty queue or not. + // + // Primarily, are required to drain the queue here because the port + // will never remove this data. We can only have at most one item to + // drain (the port drains the rest). + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + let first = self.queue.pop(); + let second = self.queue.pop(); + assert!(second.is_none()); + + match first { + Some(..) => UpSuccess, // we failed to send the data + None => UpDisconnected, // we successfully sent data + } + } + + // Otherwise we just sent some data on a non-waiting queue, so just + // make sure the world is sane and carry on! + n => { + assert!(n >= 0); + UpSuccess + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&self) -> SignalToken { + let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst); + assert!(ptr != EMPTY); + unsafe { SignalToken::from_raw(ptr) } + } + + // Decrements the count on the channel for a sleeper, returning the sleeper + // back if it shouldn't sleep. Note that this is the location where we take + // steals into account. + fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); + let ptr = unsafe { token.to_raw() }; + self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); + + let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; + + match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { + return Ok(()); + } + } + } + + self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst); + Err(unsafe { SignalToken::from_raw(ptr) }) + } + + pub fn recv(&self, deadline: Option) -> Result> { + // Optimistic preflight check (scheduling is expensive). + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + // Welp, our channel has no data. Deschedule the current thread and + // initiate the blocking protocol. + let (wait_token, signal_token) = blocking::tokens(); + if self.decrement(signal_token).is_ok() { + if let Some(deadline) = deadline { + let timed_out = !wait_token.wait_max_until(deadline); + if timed_out { + self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?; + } + } else { + wait_token.wait(); + } + } + + match self.try_recv() { + // Messages which actually popped from the queue shouldn't count as + // a steal, so offset the decrement here (we already have our + // "steal" factored into the channel count above). + data @ (Ok(..) | Err(Upgraded(..))) => unsafe { + *self.queue.consumer_addition().steals.get() -= 1; + data + }, + + data => data, + } + } + + pub fn try_recv(&self) -> Result> { + match self.queue.pop() { + // If we stole some data, record to that effect (this will be + // factored into cnt later on). + // + // Note that we don't allow steals to grow without bound in order to + // prevent eventual overflow of either steals or cnt as an overflow + // would have catastrophic results. Sometimes, steals > cnt, but + // other times cnt > steals, so we don't know the relation between + // steals and cnt. This code path is executed only rarely, so we do + // a pretty slow operation, of swapping 0 into cnt, taking steals + // down as much as possible (without going negative), and then + // adding back in whatever we couldn't factor into steals. + Some(data) => unsafe { + if *self.queue.consumer_addition().steals.get() > MAX_STEALS { + match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { + DISCONNECTED => { + self.queue + .producer_addition() + .cnt + .store(DISCONNECTED, Ordering::SeqCst); + } + n => { + let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); + *self.queue.consumer_addition().steals.get() -= m; + self.bump(n - m); + } + } + assert!(*self.queue.consumer_addition().steals.get() >= 0); + } + *self.queue.consumer_addition().steals.get() += 1; + match data { + Data(t) => Ok(t), + GoUp(up) => Err(Upgraded(up)), + } + }, + + None => { + match self.queue.producer_addition().cnt.load(Ordering::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + + // This is a little bit of a tricky case. We failed to pop + // data above, and then we have viewed that the channel is + // disconnected. In this window more data could have been + // sent on the channel. It doesn't really make sense to + // return that the channel is disconnected when there's + // actually data on it, so be extra sure there's no data by + // popping one more time. + // + // We can ignore steals because the other end is + // disconnected and we'll never need to really factor in our + // steals again. + _ => match self.queue.pop() { + Some(Data(t)) => Ok(t), + Some(GoUp(up)) => Err(Upgraded(up)), + None => Err(Disconnected), + }, + } + } + } + } + + pub fn drop_chan(&self) { + // Dropping a channel is pretty simple, we just flag it as disconnected + // and then wakeup a blocker if there is one. + match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { + -1 => { + self.take_to_wake().signal(); + } + DISCONNECTED => {} + n => { + assert!(n >= 0); + } + } + } + + pub fn drop_port(&self) { + // Dropping a port seems like a fairly trivial thing. In theory all we + // need to do is flag that we're disconnected and then everything else + // can take over (we don't have anyone to wake up). + // + // The catch for Ports is that we want to drop the entire contents of + // the queue. There are multiple reasons for having this property, the + // largest of which is that if another chan is waiting in this channel + // (but not received yet), then waiting on that port will cause a + // deadlock. + // + // So if we accept that we must now destroy the entire contents of the + // queue, this code may make a bit more sense. The tricky part is that + // we can't let any in-flight sends go un-dropped, we have to make sure + // *everything* is dropped and nothing new will come onto the channel. + + // The first thing we do is set a flag saying that we're done for. All + // sends are gated on this flag, so we're immediately guaranteed that + // there are a bounded number of active sends that we'll have to deal + // with. + self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst); + + // Now that we're guaranteed to deal with a bounded number of senders, + // we need to drain the queue. This draining process happens atomically + // with respect to the "count" of the channel. If the count is nonzero + // (with steals taken into account), then there must be data on the + // channel. In this case we drain everything and then try again. We will + // continue to fail while active senders send data while we're dropping + // data, but eventually we're guaranteed to break out of this loop + // (because there is a bounded number of senders). + let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; + while { + match self.queue.producer_addition().cnt.compare_exchange( + steals, + DISCONNECTED, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => false, + Err(old) => old != DISCONNECTED, + } + } { + while self.queue.pop().is_some() { + steals += 1; + } + } + + // At this point in time, we have gated all future senders from sending, + // and we have flagged the channel as being disconnected. The senders + // still have some responsibility, however, because some sends might not + // complete until after we flag the disconnection. There are more + // details in the sending methods that see DISCONNECTED + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // increment the count on the channel (used for selection) + fn bump(&self, amt: isize) -> isize { + match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + DISCONNECTED + } + n => n, + } + } + + // Removes a previous thread from being blocked in this port + pub fn abort_selection(&self, was_upgrade: bool) -> Result> { + // If we're aborting selection after upgrading from a oneshot, then + // we're guarantee that no one is waiting. The only way that we could + // have seen the upgrade is if data was actually sent on the channel + // half again. For us, this means that there is guaranteed to be data on + // this channel. Furthermore, we're guaranteed that there was no + // start_selection previously, so there's no need to modify `self.cnt` + // at all. + // + // Hence, because of these invariants, we immediately return `Ok(true)`. + // Note that the data might not actually be sent on the channel just yet. + // The other end could have flagged the upgrade but not sent data to + // this end. This is fine because we know it's a small bounded windows + // of time until the data is actually sent. + if was_upgrade { + assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); + return Ok(true); + } + + // We want to make sure that the count on the channel goes non-negative, + // and in the stream case we can have at most one steal, so just assume + // that we had one steal. + let steals = 1; + let prev = self.bump(steals + 1); + + // If we were previously disconnected, then we know for sure that there + // is no thread in to_wake, so just keep going + let has_data = if prev == DISCONNECTED { + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); + true // there is data, that data is that we're disconnected + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + + // If the previous count was negative, then we just made things go + // positive, hence we passed the -1 boundary and we're responsible + // for removing the to_wake() field and trashing it. + // + // If the previous count was positive then we're in a tougher + // situation. A possible race is that a sender just incremented + // through -1 (meaning it's going to try to wake a thread up), but it + // hasn't yet read the to_wake. In order to prevent a future recv() + // from waking up too early (this sender picking up the plastered + // over to_wake), we spin loop here waiting for to_wake to be 0. + // Note that this entire select() implementation needs an overhaul, + // and this is *not* the worst part of it, so this is not done as a + // final solution but rather out of necessity for now to get + // something working. + if prev < 0 { + drop(self.take_to_wake()); + } else { + while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY { + thread::yield_now(); + } + } + unsafe { + assert_eq!(*self.queue.consumer_addition().steals.get(), 0); + *self.queue.consumer_addition().steals.get() = steals; + } + + // if we were previously positive, then there's surely data to + // receive + prev >= 0 + }; + + // Now that we've determined that this queue "has data", we peek at the + // queue to see if the data is an upgrade or not. If it's an upgrade, + // then we need to destroy this port and abort selection on the + // upgraded port. + if has_data { + match self.queue.peek() { + Some(&mut GoUp(..)) => match self.queue.pop() { + Some(GoUp(port)) => Err(port), + _ => unreachable!(), + }, + _ => Ok(true), + } + } else { + Ok(false) + } + } +} + +impl Drop for Packet { + fn drop(&mut self) { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); + } +} diff --git a/library/std/src/sync/mpsc/sync.rs b/library/std/src/sync/mpsc/sync.rs new file mode 100644 index 000000000..733761671 --- /dev/null +++ b/library/std/src/sync/mpsc/sync.rs @@ -0,0 +1,495 @@ +use self::Blocker::*; +/// Synchronous channels/ports +/// +/// This channel implementation differs significantly from the asynchronous +/// implementations found next to it (oneshot/stream/share). This is an +/// implementation of a synchronous, bounded buffer channel. +/// +/// Each channel is created with some amount of backing buffer, and sends will +/// *block* until buffer space becomes available. A buffer size of 0 is valid, +/// which means that every successful send is paired with a successful recv. +/// +/// This flavor of channels defines a new `send_opt` method for channels which +/// is the method by which a message is sent but the thread does not panic if it +/// cannot be delivered. +/// +/// Another major difference is that send() will *always* return back the data +/// if it couldn't be sent. This is because it is deterministically known when +/// the data is received and when it is not received. +/// +/// Implementation-wise, it can all be summed up with "use a mutex plus some +/// logic". The mutex used here is an OS native mutex, meaning that no user code +/// is run inside of the mutex (to prevent context switching). This +/// implementation shares almost all code for the buffered and unbuffered cases +/// of a synchronous channel. There are a few branches for the unbuffered case, +/// but they're mostly just relevant to blocking senders. +pub use self::Failure::*; + +use core::intrinsics::abort; +use core::mem; +use core::ptr; + +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken}; +use crate::sync::{Mutex, MutexGuard}; +use crate::time::Instant; + +const MAX_REFCOUNT: usize = (isize::MAX) as usize; + +pub struct Packet { + /// Only field outside of the mutex. Just done for kicks, but mainly because + /// the other shared channel already had the code implemented + channels: AtomicUsize, + + lock: Mutex>, +} + +unsafe impl Send for Packet {} + +unsafe impl Sync for Packet {} + +struct State { + disconnected: bool, // Is the channel disconnected yet? + queue: Queue, // queue of senders waiting to send data + blocker: Blocker, // currently blocked thread on this channel + buf: Buffer, // storage for buffered messages + cap: usize, // capacity of this channel + + /// A curious flag used to indicate whether a sender failed or succeeded in + /// blocking. This is used to transmit information back to the thread that it + /// must dequeue its message from the buffer because it was not received. + /// This is only relevant in the 0-buffer case. This obviously cannot be + /// safely constructed, but it's guaranteed to always have a valid pointer + /// value. + canceled: Option<&'static mut bool>, +} + +unsafe impl Send for State {} + +/// Possible flavors of threads who can be blocked on this channel. +enum Blocker { + BlockedSender(SignalToken), + BlockedReceiver(SignalToken), + NoneBlocked, +} + +/// Simple queue for threading threads together. Nodes are stack-allocated, so +/// this structure is not safe at all +struct Queue { + head: *mut Node, + tail: *mut Node, +} + +struct Node { + token: Option, + next: *mut Node, +} + +unsafe impl Send for Node {} + +/// A simple ring-buffer +struct Buffer { + buf: Vec>, + start: usize, + size: usize, +} + +#[derive(Debug)] +pub enum Failure { + Empty, + Disconnected, +} + +/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` +/// in the meantime. This re-locks the mutex upon returning. +fn wait<'a, 'b, T>( + lock: &'a Mutex>, + mut guard: MutexGuard<'b, State>, + f: fn(SignalToken) -> Blocker, +) -> MutexGuard<'a, State> { + let (wait_token, signal_token) = blocking::tokens(); + match mem::replace(&mut guard.blocker, f(signal_token)) { + NoneBlocked => {} + _ => unreachable!(), + } + drop(guard); // unlock + wait_token.wait(); // block + lock.lock().unwrap() // relock +} + +/// Same as wait, but waiting at most until `deadline`. +fn wait_timeout_receiver<'a, 'b, T>( + lock: &'a Mutex>, + deadline: Instant, + mut guard: MutexGuard<'b, State>, + success: &mut bool, +) -> MutexGuard<'a, State> { + let (wait_token, signal_token) = blocking::tokens(); + match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) { + NoneBlocked => {} + _ => unreachable!(), + } + drop(guard); // unlock + *success = wait_token.wait_max_until(deadline); // block + let mut new_guard = lock.lock().unwrap(); // relock + if !*success { + abort_selection(&mut new_guard); + } + new_guard +} + +fn abort_selection(guard: &mut MutexGuard<'_, State>) -> bool { + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => true, + BlockedSender(token) => { + guard.blocker = BlockedSender(token); + true + } + BlockedReceiver(token) => { + drop(token); + false + } + } +} + +/// Wakes up a thread, dropping the lock at the correct time +fn wakeup(token: SignalToken, guard: MutexGuard<'_, State>) { + // We need to be careful to wake up the waiting thread *outside* of the mutex + // in case it incurs a context switch. + drop(guard); + token.signal(); +} + +impl Packet { + pub fn new(capacity: usize) -> Packet { + Packet { + channels: AtomicUsize::new(1), + lock: Mutex::new(State { + disconnected: false, + blocker: NoneBlocked, + cap: capacity, + canceled: None, + queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() }, + buf: Buffer { + buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(), + start: 0, + size: 0, + }, + }), + } + } + + // wait until a send slot is available, returning locked access to + // the channel state. + fn acquire_send_slot(&self) -> MutexGuard<'_, State> { + let mut node = Node { token: None, next: ptr::null_mut() }; + loop { + let mut guard = self.lock.lock().unwrap(); + // are we ready to go? + if guard.disconnected || guard.buf.size() < guard.buf.capacity() { + return guard; + } + // no room; actually block + let wait_token = guard.queue.enqueue(&mut node); + drop(guard); + wait_token.wait(); + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + let mut guard = self.acquire_send_slot(); + if guard.disconnected { + return Err(t); + } + guard.buf.enqueue(t); + + match mem::replace(&mut guard.blocker, NoneBlocked) { + // if our capacity is 0, then we need to wait for a receiver to be + // available to take our data. After waiting, we check again to make + // sure the port didn't go away in the meantime. If it did, we need + // to hand back our data. + NoneBlocked if guard.cap == 0 => { + let mut canceled = false; + assert!(guard.canceled.is_none()); + guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); + let mut guard = wait(&self.lock, guard, BlockedSender); + if canceled { Err(guard.buf.dequeue()) } else { Ok(()) } + } + + // success, we buffered some data + NoneBlocked => Ok(()), + + // success, someone's about to receive our buffered data. + BlockedReceiver(token) => { + wakeup(token, guard); + Ok(()) + } + + BlockedSender(..) => panic!("lolwut"), + } + } + + pub fn try_send(&self, t: T) -> Result<(), super::TrySendError> { + let mut guard = self.lock.lock().unwrap(); + if guard.disconnected { + Err(super::TrySendError::Disconnected(t)) + } else if guard.buf.size() == guard.buf.capacity() { + Err(super::TrySendError::Full(t)) + } else if guard.cap == 0 { + // With capacity 0, even though we have buffer space we can't + // transfer the data unless there's a receiver waiting. + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => Err(super::TrySendError::Full(t)), + BlockedSender(..) => unreachable!(), + BlockedReceiver(token) => { + guard.buf.enqueue(t); + wakeup(token, guard); + Ok(()) + } + } + } else { + // If the buffer has some space and the capacity isn't 0, then we + // just enqueue the data for later retrieval, ensuring to wake up + // any blocked receiver if there is one. + assert!(guard.buf.size() < guard.buf.capacity()); + guard.buf.enqueue(t); + match mem::replace(&mut guard.blocker, NoneBlocked) { + BlockedReceiver(token) => wakeup(token, guard), + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + } + Ok(()) + } + } + + // Receives a message from this channel + // + // When reading this, remember that there can only ever be one receiver at + // time. + pub fn recv(&self, deadline: Option) -> Result { + let mut guard = self.lock.lock().unwrap(); + + let mut woke_up_after_waiting = false; + // Wait for the buffer to have something in it. No need for a + // while loop because we're the only receiver. + if !guard.disconnected && guard.buf.size() == 0 { + if let Some(deadline) = deadline { + guard = + wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting); + } else { + guard = wait(&self.lock, guard, BlockedReceiver); + woke_up_after_waiting = true; + } + } + + // N.B., channel could be disconnected while waiting, so the order of + // these conditionals is important. + if guard.disconnected && guard.buf.size() == 0 { + return Err(Disconnected); + } + + // Pick up the data, wake up our neighbors, and carry on + assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting)); + + if guard.buf.size() == 0 { + return Err(Empty); + } + + let ret = guard.buf.dequeue(); + self.wakeup_senders(woke_up_after_waiting, guard); + Ok(ret) + } + + pub fn try_recv(&self) -> Result { + let mut guard = self.lock.lock().unwrap(); + + // Easy cases first + if guard.disconnected && guard.buf.size() == 0 { + return Err(Disconnected); + } + if guard.buf.size() == 0 { + return Err(Empty); + } + + // Be sure to wake up neighbors + let ret = Ok(guard.buf.dequeue()); + self.wakeup_senders(false, guard); + ret + } + + // Wake up pending senders after some data has been received + // + // * `waited` - flag if the receiver blocked to receive some data, or if it + // just picked up some data on the way out + // * `guard` - the lock guard that is held over this channel's lock + fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State>) { + let pending_sender1: Option = guard.queue.dequeue(); + + // If this is a no-buffer channel (cap == 0), then if we didn't wait we + // need to ACK the sender. If we waited, then the sender waking us up + // was already the ACK. + let pending_sender2 = if guard.cap == 0 && !waited { + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedReceiver(..) => unreachable!(), + BlockedSender(token) => { + guard.canceled.take(); + Some(token) + } + } + } else { + None + }; + mem::drop(guard); + + // only outside of the lock do we wake up the pending threads + if let Some(token) = pending_sender1 { + token.signal(); + } + if let Some(token) = pending_sender2 { + token.signal(); + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&self) { + let old_count = self.channels.fetch_add(1, Ordering::SeqCst); + + // See comments on Arc::clone() on why we do this (for `mem::forget`). + if old_count > MAX_REFCOUNT { + abort(); + } + } + + pub fn drop_chan(&self) { + // Only flag the channel as disconnected if we're the last channel + match self.channels.fetch_sub(1, Ordering::SeqCst) { + 1 => {} + _ => return, + } + + // Not much to do other than wake up a receiver if one's there + let mut guard = self.lock.lock().unwrap(); + if guard.disconnected { + return; + } + guard.disconnected = true; + match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(token) => wakeup(token, guard), + } + } + + pub fn drop_port(&self) { + let mut guard = self.lock.lock().unwrap(); + + if guard.disconnected { + return; + } + guard.disconnected = true; + + // If the capacity is 0, then the sender may want its data back after + // we're disconnected. Otherwise it's now our responsibility to destroy + // the buffered data. As with many other portions of this code, this + // needs to be careful to destroy the data *outside* of the lock to + // prevent deadlock. + let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() }; + let mut queue = + mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() }); + + let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedSender(token) => { + *guard.canceled.take().unwrap() = true; + Some(token) + } + BlockedReceiver(..) => unreachable!(), + }; + mem::drop(guard); + + while let Some(token) = queue.dequeue() { + token.signal(); + } + if let Some(token) = waiter { + token.signal(); + } + } +} + +impl Drop for Packet { + fn drop(&mut self) { + assert_eq!(self.channels.load(Ordering::SeqCst), 0); + let mut guard = self.lock.lock().unwrap(); + assert!(guard.queue.dequeue().is_none()); + assert!(guard.canceled.is_none()); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Buffer, a simple ring buffer backed by Vec +//////////////////////////////////////////////////////////////////////////////// + +impl Buffer { + fn enqueue(&mut self, t: T) { + let pos = (self.start + self.size) % self.buf.len(); + self.size += 1; + let prev = mem::replace(&mut self.buf[pos], Some(t)); + assert!(prev.is_none()); + } + + fn dequeue(&mut self) -> T { + let start = self.start; + self.size -= 1; + self.start = (self.start + 1) % self.buf.len(); + let result = &mut self.buf[start]; + result.take().unwrap() + } + + fn size(&self) -> usize { + self.size + } + fn capacity(&self) -> usize { + self.buf.len() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Queue, a simple queue to enqueue threads with (stack-allocated nodes) +//////////////////////////////////////////////////////////////////////////////// + +impl Queue { + fn enqueue(&mut self, node: &mut Node) -> WaitToken { + let (wait_token, signal_token) = blocking::tokens(); + node.token = Some(signal_token); + node.next = ptr::null_mut(); + + if self.tail.is_null() { + self.head = node as *mut Node; + self.tail = node as *mut Node; + } else { + unsafe { + (*self.tail).next = node as *mut Node; + self.tail = node as *mut Node; + } + } + + wait_token + } + + fn dequeue(&mut self) -> Option { + if self.head.is_null() { + return None; + } + let node = self.head; + self.head = unsafe { (*node).next }; + if self.head.is_null() { + self.tail = ptr::null_mut(); + } + unsafe { + (*node).next = ptr::null_mut(); + Some((*node).token.take().unwrap()) + } + } +} diff --git a/library/std/src/sync/mpsc/sync_tests.rs b/library/std/src/sync/mpsc/sync_tests.rs new file mode 100644 index 000000000..e58649bab --- /dev/null +++ b/library/std/src/sync/mpsc/sync_tests.rs @@ -0,0 +1,647 @@ +use super::*; +use crate::env; +use crate::thread; +use crate::time::Duration; + +pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } +} + +#[test] +fn smoke() { + let (tx, rx) = sync_channel::(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn drop_full() { + let (tx, _rx) = sync_channel::>(1); + tx.send(Box::new(1)).unwrap(); +} + +#[test] +fn smoke_shared() { + let (tx, rx) = sync_channel::(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn recv_timeout() { + let (tx, rx) = sync_channel::(1); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(1).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); +} + +#[test] +fn smoke_threads() { + let (tx, rx) = sync_channel::(0); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn smoke_port_gone() { + let (tx, rx) = sync_channel::(0); + drop(rx); + assert!(tx.send(1).is_err()); +} + +#[test] +fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); +} + +#[test] +fn port_gone_concurrent() { + let (tx, rx) = sync_channel::(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} +} + +#[test] +fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::(0); + let tx2 = tx.clone(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} +} + +#[test] +fn smoke_chan_gone() { + let (tx, rx) = sync_channel::(0); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); +} + +#[test] +fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::(0); + thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} +} + +#[test] +fn stress() { + let (tx, rx) = sync_channel::(0); + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } +} + +#[test] +fn stress_recv_timeout_two_threads() { + let (tx, rx) = sync_channel::(0); + + thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(1)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, 10000); +} + +#[test] +fn stress_recv_timeout_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::(0); + let (dtx, drx) = sync_channel::<()>(0); + + thread::spawn(move || { + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, AMT * NTHREADS); + assert!(rx.try_recv().is_err()); + + dtx.send(()).unwrap(); + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + + drop(tx); + + drx.recv().unwrap(); +} + +#[test] +fn stress_shared() { + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::(0); + let (dtx, drx) = sync_channel::<()>(0); + + thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()).unwrap(); + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + drop(tx); + drx.recv().unwrap(); +} + +#[test] +fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::(0); + drop(rx); +} + +#[test] +fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::(0); + drop(tx); +} + +#[test] +fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::>(0); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); +} + +#[test] +fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = thread::spawn(move || { + let (tx, rx) = sync_channel::(0); + drop(tx); + rx.recv().unwrap(); + }) + .join(); + // What is our res? + assert!(res.is_err()); +} + +#[test] +fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::>(1); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); +} + +#[test] +fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::(0); + assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); +} + +#[test] +fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::(1); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); +} + +#[test] +fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::(0); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot_single_thread_try_recv_closed_with_data() { + let (tx, rx) = sync_channel::(1); + tx.send(10).unwrap(); + drop(tx); + assert_eq!(rx.try_recv(), Ok(10)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::(1); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); +} + +#[test] +fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::(0); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::>(0); + let _t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); +} + +#[test] +fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::>(0); + let _t = thread::spawn(move || { + drop(tx); + }); + let res = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }) + .join(); + assert!(res.is_err()); +} + +#[test] +fn oneshot_multi_thread_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::(0); + let _t = thread::spawn(move || { + drop(rx); + }); + drop(tx); + } +} + +#[test] +fn oneshot_multi_thread_send_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::(0); + let _t = thread::spawn(move || { + drop(rx); + }); + let _ = thread::spawn(move || { + tx.send(1).unwrap(); + }) + .join(); + } +} + +#[test] +fn oneshot_multi_thread_recv_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::(0); + let _t = thread::spawn(move || { + let res = thread::spawn(move || { + rx.recv().unwrap(); + }) + .join(); + assert!(res.is_err()); + }); + let _t = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + } +} + +#[test] +fn oneshot_multi_thread_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::>(0); + let _t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + assert!(*rx.recv().unwrap() == 10); + } +} + +#[test] +fn stream_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = sync_channel::>(0); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: SyncSender>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + }); + } + } +} + +#[test] +fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } +} + +#[test] +fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } +} + +#[test] +fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::(0); + let (total_tx, total_rx) = sync_channel::(0); + + let _t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); +} + +#[test] +fn test_recv_iter_break() { + let (tx, rx) = sync_channel::(0); + let (count_tx, count_rx) = sync_channel(0); + + let _t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); +} + +#[test] +fn try_recv_states() { + let (tx1, rx1) = sync_channel::(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + let _t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); +} + +// This bug used to end up in a livelock inside of the Receiver destructor +// because the internal state of the Shared packet was corrupted +#[test] +fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); +} + +#[test] +fn send1() { + let (tx, rx) = sync_channel::(0); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + assert_eq!(tx.send(1), Ok(())); +} + +#[test] +fn send2() { + let (tx, rx) = sync_channel::(0); + let _t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); +} + +#[test] +fn send3() { + let (tx, rx) = sync_channel::(1); + assert_eq!(tx.send(1), Ok(())); + let _t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); +} + +#[test] +fn send4() { + let (tx, rx) = sync_channel::(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + let _t = thread::spawn(move || { + assert!(tx.send(1).is_err()); + done.send(()).unwrap(); + }); + let _t = thread::spawn(move || { + assert!(tx2.send(2).is_err()); + done2.send(()).unwrap(); + }); + drop(rx); + donerx.recv().unwrap(); + donerx.recv().unwrap(); +} + +#[test] +fn try_send1() { + let (tx, _rx) = sync_channel::(0); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); +} + +#[test] +fn try_send2() { + let (tx, _rx) = sync_channel::(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); +} + +#[test] +fn try_send3() { + let (tx, rx) = sync_channel::(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); +} + +#[test] +fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + let _t = thread::spawn(move || { + rx1.recv().unwrap(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv().unwrap(); + } + + for _ in 0..100 { + repro() + } +} diff --git a/library/std/src/sync/mpsc/tests.rs b/library/std/src/sync/mpsc/tests.rs new file mode 100644 index 000000000..4deb3e596 --- /dev/null +++ b/library/std/src/sync/mpsc/tests.rs @@ -0,0 +1,706 @@ +use super::*; +use crate::env; +use crate::thread; +use crate::time::{Duration, Instant}; + +pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } +} + +#[test] +fn smoke() { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn drop_full() { + let (tx, _rx) = channel::>(); + tx.send(Box::new(1)).unwrap(); +} + +#[test] +fn drop_full_shared() { + let (tx, _rx) = channel::>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(Box::new(1)).unwrap(); +} + +#[test] +fn smoke_shared() { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn smoke_threads() { + let (tx, rx) = channel::(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn smoke_port_gone() { + let (tx, rx) = channel::(); + drop(rx); + assert!(tx.send(1).is_err()); +} + +#[test] +fn smoke_shared_port_gone() { + let (tx, rx) = channel::(); + drop(rx); + assert!(tx.send(1).is_err()) +} + +#[test] +fn smoke_shared_port_gone2() { + let (tx, rx) = channel::(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); +} + +#[test] +fn port_gone_concurrent() { + let (tx, rx) = channel::(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} +} + +#[test] +fn port_gone_concurrent_shared() { + let (tx, rx) = channel::(); + let tx2 = tx.clone(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} +} + +#[test] +fn smoke_chan_gone() { + let (tx, rx) = channel::(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); +} + +#[test] +fn chan_gone_concurrent() { + let (tx, rx) = channel::(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} +} + +#[test] +fn stress() { + let (tx, rx) = channel::(); + let t = thread::spawn(move || { + for _ in 0..10000 { + tx.send(1).unwrap(); + } + }); + for _ in 0..10000 { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().expect("thread panicked"); +} + +#[test] +fn stress_shared() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = channel::(); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + drop(tx); + t.join().ok().expect("thread panicked"); +} + +#[test] +fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::(); + let t1 = thread::spawn(move || { + tx1.send(()).unwrap(); + for _ in 0..40 { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = thread::spawn(move || { + for _ in 0..40 { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); +} + +#[test] +fn recv_from_outside_runtime() { + let (tx, rx) = channel::(); + let t = thread::spawn(move || { + for _ in 0..40 { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in 0..40 { + tx.send(1).unwrap(); + } + t.join().ok().expect("thread panicked"); +} + +#[test] +fn no_runtime() { + let (tx1, rx1) = channel::(); + let (tx2, rx2) = channel::(); + let t1 = thread::spawn(move || { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = thread::spawn(move || { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); +} + +#[test] +fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::(); + drop(rx); +} + +#[test] +fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::(); + drop(tx); +} + +#[test] +fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::>(); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); +} + +#[test] +fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = thread::spawn(move || { + let (tx, rx) = channel::(); + drop(tx); + rx.recv().unwrap(); + }) + .join(); + // What is our res? + assert!(res.is_err()); +} + +#[test] +fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::>(); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::(); + drop(rx); + assert!(tx.send(10).is_err()); +} + +#[test] +fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); +} + +#[test] +fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); +} + +#[test] +fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::>(); + let _t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); +} + +#[test] +fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::>(); + let _t = thread::spawn(move || { + drop(tx); + }); + let res = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }) + .join(); + assert!(res.is_err()); +} + +#[test] +fn oneshot_multi_thread_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::(); + let _t = thread::spawn(move || { + drop(rx); + }); + drop(tx); + } +} + +#[test] +fn oneshot_multi_thread_send_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::(); + let _t = thread::spawn(move || { + drop(rx); + }); + let _ = thread::spawn(move || { + tx.send(1).unwrap(); + }) + .join(); + } +} + +#[test] +fn oneshot_multi_thread_recv_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::(); + thread::spawn(move || { + let res = thread::spawn(move || { + rx.recv().unwrap(); + }) + .join(); + assert!(res.is_err()); + }); + let _t = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + } +} + +#[test] +fn oneshot_multi_thread_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::>(); + let _t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + assert!(*rx.recv().unwrap() == 10); + } +} + +#[test] +fn stream_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel(); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: Sender>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + }); + } + } +} + +#[test] +fn oneshot_single_thread_recv_timeout() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); +} + +#[test] +fn stress_recv_timeout_two_threads() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + let timeout = Duration::from_millis(100); + + thread::spawn(move || { + for i in 0..stress { + if i % 2 == 0 { + thread::sleep(timeout * 2); + } + tx.send(1usize).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(timeout) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); +} + +#[test] +fn recv_timeout_upgrade() { + let (tx, rx) = channel::<()>(); + let timeout = Duration::from_millis(1); + let _tx_clone = tx.clone(); + + let start = Instant::now(); + assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); + assert!(Instant::now() >= start + timeout); +} + +#[test] +fn stress_recv_timeout_shared() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + + for i in 0..stress { + let tx = tx.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(i as u64 * 10)); + tx.send(1usize).unwrap(); + }); + } + + drop(tx); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); +} + +#[test] +fn very_long_recv_timeout_wont_panic() { + let (tx, rx) = channel::<()>(); + let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX))); + thread::sleep(Duration::from_secs(1)); + assert!(tx.send(()).is_ok()); + assert_eq!(join_handle.join().unwrap(), Ok(())); +} + +#[test] +fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in 0..10000 { + tx.send(()).unwrap(); + } + for _ in 0..10000 { + rx.recv().unwrap(); + } +} + +#[test] +fn shared_recv_timeout() { + let (tx, rx) = channel(); + let total = 5; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); +} + +#[test] +fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } +} + +#[test] +fn test_nested_recv_iter() { + let (tx, rx) = channel::(); + let (total_tx, total_rx) = channel::(); + + let _t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); +} + +#[test] +fn test_recv_iter_break() { + let (tx, rx) = channel::(); + let (count_tx, count_rx) = channel(); + + let _t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); +} + +#[test] +fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move || { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + if response_tx.send(2).is_err() { + break; + } + } + + assert_eq!(t.join().unwrap(), 6); +} + +#[test] +fn test_recv_into_iter_owned() { + let mut iter = { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + rx.into_iter() + }; + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} + +#[test] +fn test_recv_into_iter_borrowed() { + let (tx, rx) = channel::(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + let mut iter = (&rx).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} + +#[test] +fn try_recv_states() { + let (tx1, rx1) = channel::(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let _t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); +} + +// This bug used to end up in a livelock inside of the Receiver destructor +// because the internal state of the Shared packet was corrupted +#[test] +fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); +} + +#[test] +fn issue_32114() { + let (tx, _) = channel(); + let _ = tx.send(123); + assert_eq!(tx.send(123), Err(SendError(123))); +} diff --git a/library/std/src/sync/mutex.rs b/library/std/src/sync/mutex.rs new file mode 100644 index 000000000..e0d13cd64 --- /dev/null +++ b/library/std/src/sync/mutex.rs @@ -0,0 +1,553 @@ +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +use crate::cell::UnsafeCell; +use crate::fmt; +use crate::ops::{Deref, DerefMut}; +use crate::sync::{poison, LockResult, TryLockError, TryLockResult}; +use crate::sys_common::mutex as sys; + +/// A mutual exclusion primitive useful for protecting shared data +/// +/// This mutex will block threads waiting for the lock to become available. The +/// mutex can be created via a [`new`] constructor. Each mutex has a type parameter +/// which represents the data that it is protecting. The data can only be accessed +/// through the RAII guards returned from [`lock`] and [`try_lock`], which +/// guarantees that the data is only ever accessed when the mutex is locked. +/// +/// # Poisoning +/// +/// The mutexes in this module implement a strategy called "poisoning" where a +/// mutex is considered poisoned whenever a thread panics while holding the +/// mutex. Once a mutex is poisoned, all other threads are unable to access the +/// data by default as it is likely tainted (some invariant is not being +/// upheld). +/// +/// For a mutex, this means that the [`lock`] and [`try_lock`] methods return a +/// [`Result`] which indicates whether a mutex has been poisoned or not. Most +/// usage of a mutex will simply [`unwrap()`] these results, propagating panics +/// among threads to ensure that a possibly invalid invariant is not witnessed. +/// +/// A poisoned mutex, however, does not prevent all access to the underlying +/// data. The [`PoisonError`] type has an [`into_inner`] method which will return +/// the guard that would have otherwise been returned on a successful lock. This +/// allows access to the data, despite the lock being poisoned. +/// +/// [`new`]: Self::new +/// [`lock`]: Self::lock +/// [`try_lock`]: Self::try_lock +/// [`unwrap()`]: Result::unwrap +/// [`PoisonError`]: super::PoisonError +/// [`into_inner`]: super::PoisonError::into_inner +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use std::thread; +/// use std::sync::mpsc::channel; +/// +/// const N: usize = 10; +/// +/// // Spawn a few threads to increment a shared variable (non-atomically), and +/// // let the main thread know once all increments are done. +/// // +/// // Here we're using an Arc to share memory among threads, and the data inside +/// // the Arc is protected with a mutex. +/// let data = Arc::new(Mutex::new(0)); +/// +/// let (tx, rx) = channel(); +/// for _ in 0..N { +/// let (data, tx) = (Arc::clone(&data), tx.clone()); +/// thread::spawn(move || { +/// // The shared state can only be accessed once the lock is held. +/// // Our non-atomic increment is safe because we're the only thread +/// // which can access the shared state when the lock is held. +/// // +/// // We unwrap() the return value to assert that we are not expecting +/// // threads to ever fail while holding the lock. +/// let mut data = data.lock().unwrap(); +/// *data += 1; +/// if *data == N { +/// tx.send(()).unwrap(); +/// } +/// // the lock is unlocked here when `data` goes out of scope. +/// }); +/// } +/// +/// rx.recv().unwrap(); +/// ``` +/// +/// To recover from a poisoned mutex: +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use std::thread; +/// +/// let lock = Arc::new(Mutex::new(0_u32)); +/// let lock2 = Arc::clone(&lock); +/// +/// let _ = thread::spawn(move || -> () { +/// // This thread will acquire the mutex first, unwrapping the result of +/// // `lock` because the lock has not been poisoned. +/// let _guard = lock2.lock().unwrap(); +/// +/// // This panic while holding the lock (`_guard` is in scope) will poison +/// // the mutex. +/// panic!(); +/// }).join(); +/// +/// // The lock is poisoned by this point, but the returned result can be +/// // pattern matched on to return the underlying guard on both branches. +/// let mut guard = match lock.lock() { +/// Ok(guard) => guard, +/// Err(poisoned) => poisoned.into_inner(), +/// }; +/// +/// *guard += 1; +/// ``` +/// +/// It is sometimes necessary to manually drop the mutex guard to unlock it +/// sooner than the end of the enclosing scope. +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use std::thread; +/// +/// const N: usize = 3; +/// +/// let data_mutex = Arc::new(Mutex::new(vec![1, 2, 3, 4])); +/// let res_mutex = Arc::new(Mutex::new(0)); +/// +/// let mut threads = Vec::with_capacity(N); +/// (0..N).for_each(|_| { +/// let data_mutex_clone = Arc::clone(&data_mutex); +/// let res_mutex_clone = Arc::clone(&res_mutex); +/// +/// threads.push(thread::spawn(move || { +/// let mut data = data_mutex_clone.lock().unwrap(); +/// // This is the result of some important and long-ish work. +/// let result = data.iter().fold(0, |acc, x| acc + x * 2); +/// data.push(result); +/// drop(data); +/// *res_mutex_clone.lock().unwrap() += result; +/// })); +/// }); +/// +/// let mut data = data_mutex.lock().unwrap(); +/// // This is the result of some important and long-ish work. +/// let result = data.iter().fold(0, |acc, x| acc + x * 2); +/// data.push(result); +/// // We drop the `data` explicitly because it's not necessary anymore and the +/// // thread still has work to do. This allow other threads to start working on +/// // the data immediately, without waiting for the rest of the unrelated work +/// // to be done here. +/// // +/// // It's even more important here than in the threads because we `.join` the +/// // threads after that. If we had not dropped the mutex guard, a thread could +/// // be waiting forever for it, causing a deadlock. +/// drop(data); +/// // Here the mutex guard is not assigned to a variable and so, even if the +/// // scope does not end after this line, the mutex is still released: there is +/// // no deadlock. +/// *res_mutex.lock().unwrap() += result; +/// +/// threads.into_iter().for_each(|thread| { +/// thread +/// .join() +/// .expect("The thread creating or execution failed !") +/// }); +/// +/// assert_eq!(*res_mutex.lock().unwrap(), 800); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[cfg_attr(not(test), rustc_diagnostic_item = "Mutex")] +pub struct Mutex { + inner: sys::MovableMutex, + poison: poison::Flag, + data: UnsafeCell, +} + +// these are the only places where `T: Send` matters; all other +// functionality works fine on a single thread. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for Mutex {} +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Sync for Mutex {} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its +/// [`Deref`] and [`DerefMut`] implementations. +/// +/// This structure is created by the [`lock`] and [`try_lock`] methods on +/// [`Mutex`]. +/// +/// [`lock`]: Mutex::lock +/// [`try_lock`]: Mutex::try_lock +#[must_use = "if unused the Mutex will immediately unlock"] +#[must_not_suspend = "holding a MutexGuard across suspend \ + points can cause deadlocks, delays, \ + and cause Futures to not implement `Send`"] +#[stable(feature = "rust1", since = "1.0.0")] +#[clippy::has_significant_drop] +pub struct MutexGuard<'a, T: ?Sized + 'a> { + lock: &'a Mutex, + poison: poison::Guard, +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl !Send for MutexGuard<'_, T> {} +#[stable(feature = "mutexguard", since = "1.19.0")] +unsafe impl Sync for MutexGuard<'_, T> {} + +impl Mutex { + /// Creates a new mutex in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Mutex; + /// + /// let mutex = Mutex::new(0); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[rustc_const_stable(feature = "const_locks", since = "1.63.0")] + #[inline] + pub const fn new(t: T) -> Mutex { + Mutex { + inner: sys::MovableMutex::new(), + poison: poison::Flag::new(), + data: UnsafeCell::new(t), + } + } +} + +impl Mutex { + /// Acquires a mutex, blocking the current thread until it is able to do so. + /// + /// This function will block the local thread until it is available to acquire + /// the mutex. Upon returning, the thread is the only thread with the lock + /// held. An RAII guard is returned to allow scoped unlock of the lock. When + /// the guard goes out of scope, the mutex will be unlocked. + /// + /// The exact behavior on locking a mutex in the thread which already holds + /// the lock is left unspecified. However, this function will not return on + /// the second call (it might panic or deadlock, for example). + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return an error once the mutex is acquired. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by + /// the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// *c_mutex.lock().unwrap() = 10; + /// }).join().expect("thread::spawn failed"); + /// assert_eq!(*mutex.lock().unwrap(), 10); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn lock(&self) -> LockResult> { + unsafe { + self.inner.raw_lock(); + MutexGuard::new(self) + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then [`Err`] is returned. + /// Otherwise, an RAII guard is returned. The lock will be unlocked when the + /// guard is dropped. + /// + /// This function does not block. + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return the [`Poisoned`] error if the mutex would + /// otherwise be acquired. + /// + /// If the mutex could not be acquired because it is already locked, then + /// this call will return the [`WouldBlock`] error. + /// + /// [`Poisoned`]: TryLockError::Poisoned + /// [`WouldBlock`]: TryLockError::WouldBlock + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// thread::spawn(move || { + /// let mut lock = c_mutex.try_lock(); + /// if let Ok(ref mut mutex) = lock { + /// **mutex = 10; + /// } else { + /// println!("try_lock failed"); + /// } + /// }).join().expect("thread::spawn failed"); + /// assert_eq!(*mutex.lock().unwrap(), 10); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_lock(&self) -> TryLockResult> { + unsafe { + if self.inner.try_lock() { + Ok(MutexGuard::new(self)?) + } else { + Err(TryLockError::WouldBlock) + } + } + } + + /// Immediately drops the guard, and consequently unlocks the mutex. + /// + /// This function is equivalent to calling [`drop`] on the guard but is more self-documenting. + /// Alternately, the guard will be automatically dropped when it goes out of scope. + /// + /// ``` + /// #![feature(mutex_unlock)] + /// + /// use std::sync::Mutex; + /// let mutex = Mutex::new(0); + /// + /// let mut guard = mutex.lock().unwrap(); + /// *guard += 20; + /// Mutex::unlock(guard); + /// ``` + #[unstable(feature = "mutex_unlock", issue = "81872")] + pub fn unlock(guard: MutexGuard<'_, T>) { + drop(guard); + } + + /// Determines whether the mutex is poisoned. + /// + /// If another thread is active, the mutex can still become poisoned at any + /// time. You should not trust a `false` value for program correctness + /// without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_mutex.lock().unwrap(); + /// panic!(); // the mutex gets poisoned + /// }).join(); + /// assert_eq!(mutex.is_poisoned(), true); + /// ``` + #[inline] + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn is_poisoned(&self) -> bool { + self.poison.get() + } + + /// Clear the poisoned state from a mutex + /// + /// If the mutex is poisoned, it will remain poisoned until this function is called. This + /// allows recovering from a poisoned state and marking that it has recovered. For example, if + /// the value is overwritten by a known-good value, then the mutex can be marked as + /// un-poisoned. Or possibly, the value could be inspected to determine if it is in a + /// consistent state, and if so the poison is removed. + /// + /// # Examples + /// + /// ``` + /// #![feature(mutex_unpoison)] + /// + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(0)); + /// let c_mutex = Arc::clone(&mutex); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_mutex.lock().unwrap(); + /// panic!(); // the mutex gets poisoned + /// }).join(); + /// + /// assert_eq!(mutex.is_poisoned(), true); + /// let x = mutex.lock().unwrap_or_else(|mut e| { + /// **e.get_mut() = 1; + /// mutex.clear_poison(); + /// e.into_inner() + /// }); + /// assert_eq!(mutex.is_poisoned(), false); + /// assert_eq!(*x, 1); + /// ``` + #[inline] + #[unstable(feature = "mutex_unpoison", issue = "96469")] + pub fn clear_poison(&self) { + self.poison.clear(); + } + + /// Consumes this mutex, returning the underlying data. + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return an error instead. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Mutex; + /// + /// let mutex = Mutex::new(0); + /// assert_eq!(mutex.into_inner().unwrap(), 0); + /// ``` + #[stable(feature = "mutex_into_inner", since = "1.6.0")] + pub fn into_inner(self) -> LockResult + where + T: Sized, + { + let data = self.data.into_inner(); + poison::map_result(self.poison.borrow(), |()| data) + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `Mutex` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Errors + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return an error instead. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Mutex; + /// + /// let mut mutex = Mutex::new(0); + /// *mutex.get_mut().unwrap() = 10; + /// assert_eq!(*mutex.lock().unwrap(), 10); + /// ``` + #[stable(feature = "mutex_get_mut", since = "1.6.0")] + pub fn get_mut(&mut self) -> LockResult<&mut T> { + let data = self.data.get_mut(); + poison::map_result(self.poison.borrow(), |()| data) + } +} + +#[stable(feature = "mutex_from", since = "1.24.0")] +impl From for Mutex { + /// Creates a new mutex in an unlocked state ready for use. + /// This is equivalent to [`Mutex::new`]. + fn from(t: T) -> Self { + Mutex::new(t) + } +} + +#[stable(feature = "mutex_default", since = "1.10.0")] +impl Default for Mutex { + /// Creates a `Mutex`, with the `Default` value for T. + fn default() -> Mutex { + Mutex::new(Default::default()) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut d = f.debug_struct("Mutex"); + match self.try_lock() { + Ok(guard) => { + d.field("data", &&*guard); + } + Err(TryLockError::Poisoned(err)) => { + d.field("data", &&**err.get_ref()); + } + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + d.field("data", &LockedPlaceholder); + } + } + d.field("poisoned", &self.poison.get()); + d.finish_non_exhaustive() + } +} + +impl<'mutex, T: ?Sized> MutexGuard<'mutex, T> { + unsafe fn new(lock: &'mutex Mutex) -> LockResult> { + poison::map_result(lock.poison.guard(), |guard| MutexGuard { lock, poison: guard }) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Deref for MutexGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for MutexGuard<'_, T> { + #[inline] + fn drop(&mut self) { + unsafe { + self.lock.poison.done(&self.poison); + self.lock.inner.raw_unlock(); + } + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl fmt::Display for MutexGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::MovableMutex { + &guard.lock.inner +} + +pub fn guard_poison<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a poison::Flag { + &guard.lock.poison +} diff --git a/library/std/src/sync/mutex/tests.rs b/library/std/src/sync/mutex/tests.rs new file mode 100644 index 000000000..93900566f --- /dev/null +++ b/library/std/src/sync/mutex/tests.rs @@ -0,0 +1,238 @@ +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::channel; +use crate::sync::{Arc, Condvar, Mutex}; +use crate::thread; + +struct Packet(Arc<(Mutex, Condvar)>); + +#[derive(Eq, PartialEq, Debug)] +struct NonCopy(i32); + +#[test] +fn smoke() { + let m = Mutex::new(()); + drop(m.lock().unwrap()); + drop(m.lock().unwrap()); +} + +#[test] +fn lots_and_lots() { + const J: u32 = 1000; + const K: u32 = 3; + + let m = Arc::new(Mutex::new(0)); + + fn inc(m: &Mutex) { + for _ in 0..J { + *m.lock().unwrap() += 1; + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(*m.lock().unwrap(), J * K * 2); +} + +#[test] +fn try_lock() { + let m = Mutex::new(()); + *m.try_lock().unwrap() = (); +} + +#[test] +fn test_into_inner() { + let m = Mutex::new(NonCopy(10)); + assert_eq!(m.into_inner().unwrap(), NonCopy(10)); +} + +#[test] +fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = Mutex::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner().unwrap(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); +} + +#[test] +fn test_into_inner_poison() { + let m = Arc::new(Mutex::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.lock().unwrap(); + panic!("test panic in inner thread to poison mutex"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().into_inner() { + Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), + Ok(x) => panic!("into_inner of poisoned Mutex is Ok: {x:?}"), + } +} + +#[test] +fn test_get_mut() { + let mut m = Mutex::new(NonCopy(10)); + *m.get_mut().unwrap() = NonCopy(20); + assert_eq!(m.into_inner().unwrap(), NonCopy(20)); +} + +#[test] +fn test_get_mut_poison() { + let m = Arc::new(Mutex::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.lock().unwrap(); + panic!("test panic in inner thread to poison mutex"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().get_mut() { + Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), + Ok(x) => panic!("get_mut of poisoned Mutex is Ok: {x:?}"), + } +} + +#[test] +fn test_mutex_arc_condvar() { + let packet = Packet(Arc::new((Mutex::new(false), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + // wait until parent gets in + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let mut lock = lock.lock().unwrap(); + *lock = true; + cvar.notify_one(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock().unwrap(); + tx.send(()).unwrap(); + assert!(!*lock); + while !*lock { + lock = cvar.wait(lock).unwrap(); + } +} + +#[test] +fn test_arc_condvar_poison() { + let packet = Packet(Arc::new((Mutex::new(1), Condvar::new()))); + let packet2 = Packet(packet.0.clone()); + let (tx, rx) = channel(); + + let _t = thread::spawn(move || -> () { + rx.recv().unwrap(); + let &(ref lock, ref cvar) = &*packet2.0; + let _g = lock.lock().unwrap(); + cvar.notify_one(); + // Parent should fail when it wakes up. + panic!(); + }); + + let &(ref lock, ref cvar) = &*packet.0; + let mut lock = lock.lock().unwrap(); + tx.send(()).unwrap(); + while *lock == 1 { + match cvar.wait(lock) { + Ok(l) => { + lock = l; + assert_eq!(*lock, 1); + } + Err(..) => break, + } + } +} + +#[test] +fn test_mutex_arc_poison() { + let arc = Arc::new(Mutex::new(1)); + assert!(!arc.is_poisoned()); + let arc2 = arc.clone(); + let _ = thread::spawn(move || { + let lock = arc2.lock().unwrap(); + assert_eq!(*lock, 2); + }) + .join(); + assert!(arc.lock().is_err()); + assert!(arc.is_poisoned()); +} + +#[test] +fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(Mutex::new(1)); + let arc2 = Arc::new(Mutex::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock().unwrap(); + let lock2 = lock.lock().unwrap(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); +} + +#[test] +fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(Mutex::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock().unwrap() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock().unwrap(); + assert_eq!(*lock, 2); +} + +#[test] +fn test_mutex_unsized() { + let mutex: &Mutex<[i32]> = &Mutex::new([1, 2, 3]); + { + let b = &mut *mutex.lock().unwrap(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock().unwrap(), comp); +} diff --git a/library/std/src/sync/once.rs b/library/std/src/sync/once.rs new file mode 100644 index 000000000..a7feea588 --- /dev/null +++ b/library/std/src/sync/once.rs @@ -0,0 +1,580 @@ +//! A "once initialization" primitive +//! +//! This primitive is meant to be used to run one-time initialization. An +//! example use case would be for initializing an FFI library. + +// A "once" is a relatively simple primitive, and it's also typically provided +// by the OS as well (see `pthread_once` or `InitOnceExecuteOnce`). The OS +// primitives, however, tend to have surprising restrictions, such as the Unix +// one doesn't allow an argument to be passed to the function. +// +// As a result, we end up implementing it ourselves in the standard library. +// This also gives us the opportunity to optimize the implementation a bit which +// should help the fast path on call sites. Consequently, let's explain how this +// primitive works now! +// +// So to recap, the guarantees of a Once are that it will call the +// initialization closure at most once, and it will never return until the one +// that's running has finished running. This means that we need some form of +// blocking here while the custom callback is running at the very least. +// Additionally, we add on the restriction of **poisoning**. Whenever an +// initialization closure panics, the Once enters a "poisoned" state which means +// that all future calls will immediately panic as well. +// +// So to implement this, one might first reach for a `Mutex`, but those cannot +// be put into a `static`. It also gets a lot harder with poisoning to figure +// out when the mutex needs to be deallocated because it's not after the closure +// finishes, but after the first successful closure finishes. +// +// All in all, this is instead implemented with atomics and lock-free +// operations! Whee! Each `Once` has one word of atomic state, and this state is +// CAS'd on to determine what to do. There are four possible state of a `Once`: +// +// * Incomplete - no initialization has run yet, and no thread is currently +// using the Once. +// * Poisoned - some thread has previously attempted to initialize the Once, but +// it panicked, so the Once is now poisoned. There are no other +// threads currently accessing this Once. +// * Running - some thread is currently attempting to run initialization. It may +// succeed, so all future threads need to wait for it to finish. +// Note that this state is accompanied with a payload, described +// below. +// * Complete - initialization has completed and all future calls should finish +// immediately. +// +// With 4 states we need 2 bits to encode this, and we use the remaining bits +// in the word we have allocated as a queue of threads waiting for the thread +// responsible for entering the RUNNING state. This queue is just a linked list +// of Waiter nodes which is monotonically increasing in size. Each node is +// allocated on the stack, and whenever the running closure finishes it will +// consume the entire queue and notify all waiters they should try again. +// +// You'll find a few more details in the implementation, but that's the gist of +// it! +// +// Atomic orderings: +// When running `Once` we deal with multiple atomics: +// `Once.state_and_queue` and an unknown number of `Waiter.signaled`. +// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the +// result of the `Once`, and (3) for synchronizing `Waiter` nodes. +// - At the end of the `call_inner` function we have to make sure the result +// of the `Once` is acquired. So every load which can be the only one to +// load COMPLETED must have at least Acquire ordering, which means all +// three of them. +// - `WaiterQueue::Drop` is the only place that may store COMPLETED, and +// must do so with Release ordering to make the result available. +// - `wait` inserts `Waiter` nodes as a pointer in `state_and_queue`, and +// needs to make the nodes available with Release ordering. The load in +// its `compare_exchange` can be Relaxed because it only has to compare +// the atomic, not to read other data. +// - `WaiterQueue::Drop` must see the `Waiter` nodes, so it must load +// `state_and_queue` with Acquire ordering. +// - There is just one store where `state_and_queue` is used only as a +// state flag, without having to synchronize data: switching the state +// from INCOMPLETE to RUNNING in `call_inner`. This store can be Relaxed, +// but the read has to be Acquire because of the requirements mentioned +// above. +// * `Waiter.signaled` is both used as a flag, and to protect a field with +// interior mutability in `Waiter`. `Waiter.thread` is changed in +// `WaiterQueue::Drop` which then sets `signaled` with Release ordering. +// After `wait` loads `signaled` with Acquire and sees it is true, it needs to +// see the changes to drop the `Waiter` struct correctly. +// * There is one place where the two atomics `Once.state_and_queue` and +// `Waiter.signaled` come together, and might be reordered by the compiler or +// processor. Because both use Acquire ordering such a reordering is not +// allowed, so no need for SeqCst. + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +use crate::cell::Cell; +use crate::fmt; +use crate::marker; +use crate::panic::{RefUnwindSafe, UnwindSafe}; +use crate::ptr; +use crate::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; +use crate::thread::{self, Thread}; + +type Masked = (); + +/// A synchronization primitive which can be used to run a one-time global +/// initialization. Useful for one-time initialization for FFI or related +/// functionality. This type can only be constructed with [`Once::new()`]. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Once; +/// +/// static START: Once = Once::new(); +/// +/// START.call_once(|| { +/// // run initialization here +/// }); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct Once { + // `state_and_queue` is actually a pointer to a `Waiter` with extra state + // bits, so we add the `PhantomData` appropriately. + state_and_queue: AtomicPtr, + _marker: marker::PhantomData<*const Waiter>, +} + +// The `PhantomData` of a raw pointer removes these two auto traits, but we +// enforce both below in the implementation so this should be safe to add. +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Sync for Once {} +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for Once {} + +#[stable(feature = "sync_once_unwind_safe", since = "1.59.0")] +impl UnwindSafe for Once {} + +#[stable(feature = "sync_once_unwind_safe", since = "1.59.0")] +impl RefUnwindSafe for Once {} + +/// State yielded to [`Once::call_once_force()`]’s closure parameter. The state +/// can be used to query the poison status of the [`Once`]. +#[stable(feature = "once_poison", since = "1.51.0")] +#[derive(Debug)] +pub struct OnceState { + poisoned: bool, + set_state_on_drop_to: Cell<*mut Masked>, +} + +/// Initialization value for static [`Once`] values. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Once, ONCE_INIT}; +/// +/// static START: Once = ONCE_INIT; +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[deprecated( + since = "1.38.0", + note = "the `new` function is now preferred", + suggestion = "Once::new()" +)] +pub const ONCE_INIT: Once = Once::new(); + +// Four states that a Once can be in, encoded into the lower bits of +// `state_and_queue` in the Once structure. +const INCOMPLETE: usize = 0x0; +const POISONED: usize = 0x1; +const RUNNING: usize = 0x2; +const COMPLETE: usize = 0x3; + +// Mask to learn about the state. All other bits are the queue of waiters if +// this is in the RUNNING state. +const STATE_MASK: usize = 0x3; + +// Representation of a node in the linked list of waiters, used while in the +// RUNNING state. +// Note: `Waiter` can't hold a mutable pointer to the next thread, because then +// `wait` would both hand out a mutable reference to its `Waiter` node, and keep +// a shared reference to check `signaled`. Instead we hold shared references and +// use interior mutability. +#[repr(align(4))] // Ensure the two lower bits are free to use as state bits. +struct Waiter { + thread: Cell>, + signaled: AtomicBool, + next: *const Waiter, +} + +// Head of a linked list of waiters. +// Every node is a struct on the stack of a waiting thread. +// Will wake up the waiters when it gets dropped, i.e. also on panic. +struct WaiterQueue<'a> { + state_and_queue: &'a AtomicPtr, + set_state_on_drop_to: *mut Masked, +} + +impl Once { + /// Creates a new `Once` value. + #[inline] + #[stable(feature = "once_new", since = "1.2.0")] + #[rustc_const_stable(feature = "const_once_new", since = "1.32.0")] + #[must_use] + pub const fn new() -> Once { + Once { + state_and_queue: AtomicPtr::new(ptr::invalid_mut(INCOMPLETE)), + _marker: marker::PhantomData, + } + } + + /// Performs an initialization routine once and only once. The given closure + /// will be executed if this is the first time `call_once` has been called, + /// and otherwise the routine will *not* be invoked. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it might not be the closure specified). It is also + /// guaranteed that any memory writes performed by the executed closure can + /// be reliably observed by other threads at this point (there is a + /// happens-before relation between the closure and code executing after the + /// return). + /// + /// If the given closure recursively invokes `call_once` on the same [`Once`] + /// instance the exact behavior is not specified, allowed outcomes are + /// a panic or a deadlock. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Once; + /// + /// static mut VAL: usize = 0; + /// static INIT: Once = Once::new(); + /// + /// // Accessing a `static mut` is unsafe much of the time, but if we do so + /// // in a synchronized fashion (e.g., write once or read all) then we're + /// // good to go! + /// // + /// // This function will only call `expensive_computation` once, and will + /// // otherwise always return the value returned from the first invocation. + /// fn get_cached_val() -> usize { + /// unsafe { + /// INIT.call_once(|| { + /// VAL = expensive_computation(); + /// }); + /// VAL + /// } + /// } + /// + /// fn expensive_computation() -> usize { + /// // ... + /// # 2 + /// } + /// ``` + /// + /// # Panics + /// + /// The closure `f` will only be executed once if this is called + /// concurrently amongst many threads. If that closure panics, however, then + /// it will *poison* this [`Once`] instance, causing all future invocations of + /// `call_once` to also panic. + /// + /// This is similar to [poisoning with mutexes][poison]. + /// + /// [poison]: struct.Mutex.html#poisoning + #[stable(feature = "rust1", since = "1.0.0")] + #[track_caller] + pub fn call_once(&self, f: F) + where + F: FnOnce(), + { + // Fast path check + if self.is_completed() { + return; + } + + let mut f = Some(f); + self.call_inner(false, &mut |_| f.take().unwrap()()); + } + + /// Performs the same function as [`call_once()`] except ignores poisoning. + /// + /// Unlike [`call_once()`], if this [`Once`] has been poisoned (i.e., a previous + /// call to [`call_once()`] or [`call_once_force()`] caused a panic), calling + /// [`call_once_force()`] will still invoke the closure `f` and will _not_ + /// result in an immediate panic. If `f` panics, the [`Once`] will remain + /// in a poison state. If `f` does _not_ panic, the [`Once`] will no + /// longer be in a poison state and all future calls to [`call_once()`] or + /// [`call_once_force()`] will be no-ops. + /// + /// The closure `f` is yielded a [`OnceState`] structure which can be used + /// to query the poison status of the [`Once`]. + /// + /// [`call_once()`]: Once::call_once + /// [`call_once_force()`]: Once::call_once_force + /// + /// # Examples + /// + /// ``` + /// use std::sync::Once; + /// use std::thread; + /// + /// static INIT: Once = Once::new(); + /// + /// // poison the once + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); + /// + /// // poisoning propagates + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| {}); + /// }); + /// assert!(handle.join().is_err()); + /// + /// // call_once_force will still run and reset the poisoned state + /// INIT.call_once_force(|state| { + /// assert!(state.is_poisoned()); + /// }); + /// + /// // once any success happens, we stop propagating the poison + /// INIT.call_once(|| {}); + /// ``` + #[stable(feature = "once_poison", since = "1.51.0")] + pub fn call_once_force(&self, f: F) + where + F: FnOnce(&OnceState), + { + // Fast path check + if self.is_completed() { + return; + } + + let mut f = Some(f); + self.call_inner(true, &mut |p| f.take().unwrap()(p)); + } + + /// Returns `true` if some [`call_once()`] call has completed + /// successfully. Specifically, `is_completed` will return false in + /// the following situations: + /// * [`call_once()`] was not called at all, + /// * [`call_once()`] was called, but has not yet completed, + /// * the [`Once`] instance is poisoned + /// + /// This function returning `false` does not mean that [`Once`] has not been + /// executed. For example, it may have been executed in the time between + /// when `is_completed` starts executing and when it returns, in which case + /// the `false` return value would be stale (but still permissible). + /// + /// [`call_once()`]: Once::call_once + /// + /// # Examples + /// + /// ``` + /// use std::sync::Once; + /// + /// static INIT: Once = Once::new(); + /// + /// assert_eq!(INIT.is_completed(), false); + /// INIT.call_once(|| { + /// assert_eq!(INIT.is_completed(), false); + /// }); + /// assert_eq!(INIT.is_completed(), true); + /// ``` + /// + /// ``` + /// use std::sync::Once; + /// use std::thread; + /// + /// static INIT: Once = Once::new(); + /// + /// assert_eq!(INIT.is_completed(), false); + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); + /// assert_eq!(INIT.is_completed(), false); + /// ``` + #[stable(feature = "once_is_completed", since = "1.43.0")] + #[inline] + pub fn is_completed(&self) -> bool { + // An `Acquire` load is enough because that makes all the initialization + // operations visible to us, and, this being a fast path, weaker + // ordering helps with performance. This `Acquire` synchronizes with + // `Release` operations on the slow path. + self.state_and_queue.load(Ordering::Acquire).addr() == COMPLETE + } + + // This is a non-generic function to reduce the monomorphization cost of + // using `call_once` (this isn't exactly a trivial or small implementation). + // + // Additionally, this is tagged with `#[cold]` as it should indeed be cold + // and it helps let LLVM know that calls to this function should be off the + // fast path. Essentially, this should help generate more straight line code + // in LLVM. + // + // Finally, this takes an `FnMut` instead of a `FnOnce` because there's + // currently no way to take an `FnOnce` and call it via virtual dispatch + // without some allocation overhead. + #[cold] + #[track_caller] + fn call_inner(&self, ignore_poisoning: bool, init: &mut dyn FnMut(&OnceState)) { + let mut state_and_queue = self.state_and_queue.load(Ordering::Acquire); + loop { + match state_and_queue.addr() { + COMPLETE => break, + POISONED if !ignore_poisoning => { + // Panic to propagate the poison. + panic!("Once instance has previously been poisoned"); + } + POISONED | INCOMPLETE => { + // Try to register this thread as the one RUNNING. + let exchange_result = self.state_and_queue.compare_exchange( + state_and_queue, + ptr::invalid_mut(RUNNING), + Ordering::Acquire, + Ordering::Acquire, + ); + if let Err(old) = exchange_result { + state_and_queue = old; + continue; + } + // `waiter_queue` will manage other waiting threads, and + // wake them up on drop. + let mut waiter_queue = WaiterQueue { + state_and_queue: &self.state_and_queue, + set_state_on_drop_to: ptr::invalid_mut(POISONED), + }; + // Run the initialization function, letting it know if we're + // poisoned or not. + let init_state = OnceState { + poisoned: state_and_queue.addr() == POISONED, + set_state_on_drop_to: Cell::new(ptr::invalid_mut(COMPLETE)), + }; + init(&init_state); + waiter_queue.set_state_on_drop_to = init_state.set_state_on_drop_to.get(); + break; + } + _ => { + // All other values must be RUNNING with possibly a + // pointer to the waiter queue in the more significant bits. + assert!(state_and_queue.addr() & STATE_MASK == RUNNING); + wait(&self.state_and_queue, state_and_queue); + state_and_queue = self.state_and_queue.load(Ordering::Acquire); + } + } + } + } +} + +fn wait(state_and_queue: &AtomicPtr, mut current_state: *mut Masked) { + // Note: the following code was carefully written to avoid creating a + // mutable reference to `node` that gets aliased. + loop { + // Don't queue this thread if the status is no longer running, + // otherwise we will not be woken up. + if current_state.addr() & STATE_MASK != RUNNING { + return; + } + + // Create the node for our current thread. + let node = Waiter { + thread: Cell::new(Some(thread::current())), + signaled: AtomicBool::new(false), + next: current_state.with_addr(current_state.addr() & !STATE_MASK) as *const Waiter, + }; + let me = &node as *const Waiter as *const Masked as *mut Masked; + + // Try to slide in the node at the head of the linked list, making sure + // that another thread didn't just replace the head of the linked list. + let exchange_result = state_and_queue.compare_exchange( + current_state, + me.with_addr(me.addr() | RUNNING), + Ordering::Release, + Ordering::Relaxed, + ); + if let Err(old) = exchange_result { + current_state = old; + continue; + } + + // We have enqueued ourselves, now lets wait. + // It is important not to return before being signaled, otherwise we + // would drop our `Waiter` node and leave a hole in the linked list + // (and a dangling reference). Guard against spurious wakeups by + // reparking ourselves until we are signaled. + while !node.signaled.load(Ordering::Acquire) { + // If the managing thread happens to signal and unpark us before we + // can park ourselves, the result could be this thread never gets + // unparked. Luckily `park` comes with the guarantee that if it got + // an `unpark` just before on an unparked thread it does not park. + thread::park(); + } + break; + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for Once { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Once").finish_non_exhaustive() + } +} + +impl Drop for WaiterQueue<'_> { + fn drop(&mut self) { + // Swap out our state with however we finished. + let state_and_queue = + self.state_and_queue.swap(self.set_state_on_drop_to, Ordering::AcqRel); + + // We should only ever see an old state which was RUNNING. + assert_eq!(state_and_queue.addr() & STATE_MASK, RUNNING); + + // Walk the entire linked list of waiters and wake them up (in lifo + // order, last to register is first to wake up). + unsafe { + // Right after setting `node.signaled = true` the other thread may + // free `node` if there happens to be has a spurious wakeup. + // So we have to take out the `thread` field and copy the pointer to + // `next` first. + let mut queue = + state_and_queue.with_addr(state_and_queue.addr() & !STATE_MASK) as *const Waiter; + while !queue.is_null() { + let next = (*queue).next; + let thread = (*queue).thread.take().unwrap(); + (*queue).signaled.store(true, Ordering::Release); + // ^- FIXME (maybe): This is another case of issue #55005 + // `store()` has a potentially dangling ref to `signaled`. + queue = next; + thread.unpark(); + } + } + } +} + +impl OnceState { + /// Returns `true` if the associated [`Once`] was poisoned prior to the + /// invocation of the closure passed to [`Once::call_once_force()`]. + /// + /// # Examples + /// + /// A poisoned [`Once`]: + /// + /// ``` + /// use std::sync::Once; + /// use std::thread; + /// + /// static INIT: Once = Once::new(); + /// + /// // poison the once + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); + /// + /// INIT.call_once_force(|state| { + /// assert!(state.is_poisoned()); + /// }); + /// ``` + /// + /// An unpoisoned [`Once`]: + /// + /// ``` + /// use std::sync::Once; + /// + /// static INIT: Once = Once::new(); + /// + /// INIT.call_once_force(|state| { + /// assert!(!state.is_poisoned()); + /// }); + #[stable(feature = "once_poison", since = "1.51.0")] + pub fn is_poisoned(&self) -> bool { + self.poisoned + } + + /// Poison the associated [`Once`] without explicitly panicking. + // NOTE: This is currently only exposed for the `lazy` module + pub(crate) fn poison(&self) { + self.set_state_on_drop_to.set(ptr::invalid_mut(POISONED)); + } +} diff --git a/library/std/src/sync/once/tests.rs b/library/std/src/sync/once/tests.rs new file mode 100644 index 000000000..0c35597e1 --- /dev/null +++ b/library/std/src/sync/once/tests.rs @@ -0,0 +1,116 @@ +use super::Once; +use crate::panic; +use crate::sync::mpsc::channel; +use crate::thread; + +#[test] +fn smoke_once() { + static O: Once = Once::new(); + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); +} + +#[test] +fn stampede_once() { + static O: Once = Once::new(); + static mut RUN: bool = false; + + let (tx, rx) = channel(); + for _ in 0..10 { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + tx.send(()).unwrap(); + }); + } + + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + + for _ in 0..10 { + rx.recv().unwrap(); + } +} + +#[test] +fn poison_bad() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // poisoning propagates + let t = panic::catch_unwind(|| { + O.call_once(|| {}); + }); + assert!(t.is_err()); + + // we can subvert poisoning, however + let mut called = false; + O.call_once_force(|p| { + called = true; + assert!(p.is_poisoned()) + }); + assert!(called); + + // once any success happens, we stop propagating the poison + O.call_once(|| {}); +} + +#[test] +fn wait_for_force_to_finish() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // make sure someone's waiting inside the once via a force + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t1 = thread::spawn(move || { + O.call_once_force(|p| { + assert!(p.is_poisoned()); + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + }); + + rx1.recv().unwrap(); + + // put another waiter on the once + let t2 = thread::spawn(|| { + let mut called = false; + O.call_once(|| { + called = true; + }); + assert!(!called); + }); + + tx2.send(()).unwrap(); + + assert!(t1.join().is_ok()); + assert!(t2.join().is_ok()); +} diff --git a/library/std/src/sync/once_lock.rs b/library/std/src/sync/once_lock.rs new file mode 100644 index 000000000..813516040 --- /dev/null +++ b/library/std/src/sync/once_lock.rs @@ -0,0 +1,496 @@ +use crate::cell::UnsafeCell; +use crate::fmt; +use crate::marker::PhantomData; +use crate::mem::MaybeUninit; +use crate::panic::{RefUnwindSafe, UnwindSafe}; +use crate::pin::Pin; +use crate::sync::Once; + +/// A synchronization primitive which can be written to only once. +/// +/// This type is a thread-safe `OnceCell`. +/// +/// # Examples +/// +/// ``` +/// #![feature(once_cell)] +/// +/// use std::sync::OnceLock; +/// +/// static CELL: OnceLock = OnceLock::new(); +/// assert!(CELL.get().is_none()); +/// +/// std::thread::spawn(|| { +/// let value: &String = CELL.get_or_init(|| { +/// "Hello, World!".to_string() +/// }); +/// assert_eq!(value, "Hello, World!"); +/// }).join().unwrap(); +/// +/// let value: Option<&String> = CELL.get(); +/// assert!(value.is_some()); +/// assert_eq!(value.unwrap().as_str(), "Hello, World!"); +/// ``` +#[unstable(feature = "once_cell", issue = "74465")] +pub struct OnceLock { + once: Once, + // Whether or not the value is initialized is tracked by `state_and_queue`. + value: UnsafeCell>, + /// `PhantomData` to make sure dropck understands we're dropping T in our Drop impl. + /// + /// ```compile_fail,E0597 + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// struct A<'a>(&'a str); + /// + /// impl<'a> Drop for A<'a> { + /// fn drop(&mut self) {} + /// } + /// + /// let cell = OnceLock::new(); + /// { + /// let s = String::new(); + /// let _ = cell.set(A(&s)); + /// } + /// ``` + _marker: PhantomData, +} + +impl OnceLock { + /// Creates a new empty cell. + #[unstable(feature = "once_cell", issue = "74465")] + #[must_use] + pub const fn new() -> OnceLock { + OnceLock { + once: Once::new(), + value: UnsafeCell::new(MaybeUninit::uninit()), + _marker: PhantomData, + } + } + + /// Gets the reference to the underlying value. + /// + /// Returns `None` if the cell is empty, or being initialized. This + /// method never blocks. + #[unstable(feature = "once_cell", issue = "74465")] + pub fn get(&self) -> Option<&T> { + if self.is_initialized() { + // Safe b/c checked is_initialized + Some(unsafe { self.get_unchecked() }) + } else { + None + } + } + + /// Gets the mutable reference to the underlying value. + /// + /// Returns `None` if the cell is empty. This method never blocks. + #[unstable(feature = "once_cell", issue = "74465")] + pub fn get_mut(&mut self) -> Option<&mut T> { + if self.is_initialized() { + // Safe b/c checked is_initialized and we have a unique access + Some(unsafe { self.get_unchecked_mut() }) + } else { + None + } + } + + /// Sets the contents of this cell to `value`. + /// + /// May block if another thread is currently attempting to initialize the cell. The cell is + /// guaranteed to contain a value when set returns, though not necessarily the one provided. + /// + /// Returns `Ok(())` if the cell's value was set by this call. + /// + /// # Examples + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// static CELL: OnceLock = OnceLock::new(); + /// + /// fn main() { + /// assert!(CELL.get().is_none()); + /// + /// std::thread::spawn(|| { + /// assert_eq!(CELL.set(92), Ok(())); + /// }).join().unwrap(); + /// + /// assert_eq!(CELL.set(62), Err(62)); + /// assert_eq!(CELL.get(), Some(&92)); + /// } + /// ``` + #[unstable(feature = "once_cell", issue = "74465")] + pub fn set(&self, value: T) -> Result<(), T> { + let mut value = Some(value); + self.get_or_init(|| value.take().unwrap()); + match value { + None => Ok(()), + Some(value) => Err(value), + } + } + + /// Gets the contents of the cell, initializing it with `f` if the cell + /// was empty. + /// + /// Many threads may call `get_or_init` concurrently with different + /// initializing functions, but it is guaranteed that only one function + /// will be executed. + /// + /// # Panics + /// + /// If `f` panics, the panic is propagated to the caller, and the cell + /// remains uninitialized. + /// + /// It is an error to reentrantly initialize the cell from `f`. The + /// exact outcome is unspecified. Current implementation deadlocks, but + /// this may be changed to a panic in the future. + /// + /// # Examples + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// let cell = OnceLock::new(); + /// let value = cell.get_or_init(|| 92); + /// assert_eq!(value, &92); + /// let value = cell.get_or_init(|| unreachable!()); + /// assert_eq!(value, &92); + /// ``` + #[unstable(feature = "once_cell", issue = "74465")] + pub fn get_or_init(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + match self.get_or_try_init(|| Ok::(f())) { + Ok(val) => val, + } + } + + /// Gets the contents of the cell, initializing it with `f` if + /// the cell was empty. If the cell was empty and `f` failed, an + /// error is returned. + /// + /// # Panics + /// + /// If `f` panics, the panic is propagated to the caller, and + /// the cell remains uninitialized. + /// + /// It is an error to reentrantly initialize the cell from `f`. + /// The exact outcome is unspecified. Current implementation + /// deadlocks, but this may be changed to a panic in the future. + /// + /// # Examples + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// let cell = OnceLock::new(); + /// assert_eq!(cell.get_or_try_init(|| Err(())), Err(())); + /// assert!(cell.get().is_none()); + /// let value = cell.get_or_try_init(|| -> Result { + /// Ok(92) + /// }); + /// assert_eq!(value, Ok(&92)); + /// assert_eq!(cell.get(), Some(&92)) + /// ``` + #[unstable(feature = "once_cell", issue = "74465")] + pub fn get_or_try_init(&self, f: F) -> Result<&T, E> + where + F: FnOnce() -> Result, + { + // Fast path check + // NOTE: We need to perform an acquire on the state in this method + // in order to correctly synchronize `LazyLock::force`. This is + // currently done by calling `self.get()`, which in turn calls + // `self.is_initialized()`, which in turn performs the acquire. + if let Some(value) = self.get() { + return Ok(value); + } + self.initialize(f)?; + + debug_assert!(self.is_initialized()); + + // SAFETY: The inner value has been initialized + Ok(unsafe { self.get_unchecked() }) + } + + /// Internal-only API that gets the contents of the cell, initializing it + /// in two steps with `f` and `g` if the cell was empty. + /// + /// `f` is called to construct the value, which is then moved into the cell + /// and given as a (pinned) mutable reference to `g` to finish + /// initialization. + /// + /// This allows `g` to inspect an manipulate the value after it has been + /// moved into its final place in the cell, but before the cell is + /// considered initialized. + /// + /// # Panics + /// + /// If `f` or `g` panics, the panic is propagated to the caller, and the + /// cell remains uninitialized. + /// + /// With the current implementation, if `g` panics, the value from `f` will + /// not be dropped. This should probably be fixed if this is ever used for + /// a type where this matters. + /// + /// It is an error to reentrantly initialize the cell from `f`. The exact + /// outcome is unspecified. Current implementation deadlocks, but this may + /// be changed to a panic in the future. + pub(crate) fn get_or_init_pin(self: Pin<&Self>, f: F, g: G) -> Pin<&T> + where + F: FnOnce() -> T, + G: FnOnce(Pin<&mut T>), + { + if let Some(value) = self.get_ref().get() { + // SAFETY: The inner value was already initialized, and will not be + // moved anymore. + return unsafe { Pin::new_unchecked(value) }; + } + + let slot = &self.value; + + // Ignore poisoning from other threads + // If another thread panics, then we'll be able to run our closure + self.once.call_once_force(|_| { + let value = f(); + // SAFETY: We use the Once (self.once) to guarantee unique access + // to the UnsafeCell (slot). + let value: &mut T = unsafe { (&mut *slot.get()).write(value) }; + // SAFETY: The value has been written to its final place in + // self.value. We do not to move it anymore, which we promise here + // with a Pin<&mut T>. + g(unsafe { Pin::new_unchecked(value) }); + }); + + // SAFETY: The inner value has been initialized, and will not be moved + // anymore. + unsafe { Pin::new_unchecked(self.get_ref().get_unchecked()) } + } + + /// Consumes the `OnceLock`, returning the wrapped value. Returns + /// `None` if the cell was empty. + /// + /// # Examples + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// let cell: OnceLock = OnceLock::new(); + /// assert_eq!(cell.into_inner(), None); + /// + /// let cell = OnceLock::new(); + /// cell.set("hello".to_string()).unwrap(); + /// assert_eq!(cell.into_inner(), Some("hello".to_string())); + /// ``` + #[unstable(feature = "once_cell", issue = "74465")] + pub fn into_inner(mut self) -> Option { + self.take() + } + + /// Takes the value out of this `OnceLock`, moving it back to an uninitialized state. + /// + /// Has no effect and returns `None` if the `OnceLock` hasn't been initialized. + /// + /// Safety is guaranteed by requiring a mutable reference. + /// + /// # Examples + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// let mut cell: OnceLock = OnceLock::new(); + /// assert_eq!(cell.take(), None); + /// + /// let mut cell = OnceLock::new(); + /// cell.set("hello".to_string()).unwrap(); + /// assert_eq!(cell.take(), Some("hello".to_string())); + /// assert_eq!(cell.get(), None); + /// ``` + #[unstable(feature = "once_cell", issue = "74465")] + pub fn take(&mut self) -> Option { + if self.is_initialized() { + self.once = Once::new(); + // SAFETY: `self.value` is initialized and contains a valid `T`. + // `self.once` is reset, so `is_initialized()` will be false again + // which prevents the value from being read twice. + unsafe { Some((&mut *self.value.get()).assume_init_read()) } + } else { + None + } + } + + #[inline] + fn is_initialized(&self) -> bool { + self.once.is_completed() + } + + #[cold] + fn initialize(&self, f: F) -> Result<(), E> + where + F: FnOnce() -> Result, + { + let mut res: Result<(), E> = Ok(()); + let slot = &self.value; + + // Ignore poisoning from other threads + // If another thread panics, then we'll be able to run our closure + self.once.call_once_force(|p| { + match f() { + Ok(value) => { + unsafe { (&mut *slot.get()).write(value) }; + } + Err(e) => { + res = Err(e); + + // Treat the underlying `Once` as poisoned since we + // failed to initialize our value. Calls + p.poison(); + } + } + }); + res + } + + /// # Safety + /// + /// The value must be initialized + unsafe fn get_unchecked(&self) -> &T { + debug_assert!(self.is_initialized()); + (&*self.value.get()).assume_init_ref() + } + + /// # Safety + /// + /// The value must be initialized + unsafe fn get_unchecked_mut(&mut self) -> &mut T { + debug_assert!(self.is_initialized()); + (&mut *self.value.get()).assume_init_mut() + } +} + +// Why do we need `T: Send`? +// Thread A creates a `OnceLock` and shares it with +// scoped thread B, which fills the cell, which is +// then destroyed by A. That is, destructor observes +// a sent value. +#[unstable(feature = "once_cell", issue = "74465")] +unsafe impl Sync for OnceLock {} +#[unstable(feature = "once_cell", issue = "74465")] +unsafe impl Send for OnceLock {} + +#[unstable(feature = "once_cell", issue = "74465")] +impl RefUnwindSafe for OnceLock {} +#[unstable(feature = "once_cell", issue = "74465")] +impl UnwindSafe for OnceLock {} + +#[unstable(feature = "once_cell", issue = "74465")] +#[rustc_const_unstable(feature = "const_default_impls", issue = "87864")] +impl const Default for OnceLock { + /// Creates a new empty cell. + /// + /// # Example + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// fn main() { + /// assert_eq!(OnceLock::<()>::new(), OnceLock::default()); + /// } + /// ``` + fn default() -> OnceLock { + OnceLock::new() + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl fmt::Debug for OnceLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.get() { + Some(v) => f.debug_tuple("Once").field(v).finish(), + None => f.write_str("Once(Uninit)"), + } + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl Clone for OnceLock { + fn clone(&self) -> OnceLock { + let cell = Self::new(); + if let Some(value) = self.get() { + match cell.set(value.clone()) { + Ok(()) => (), + Err(_) => unreachable!(), + } + } + cell + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl From for OnceLock { + /// Create a new cell with its contents set to `value`. + /// + /// # Example + /// + /// ``` + /// #![feature(once_cell)] + /// + /// use std::sync::OnceLock; + /// + /// # fn main() -> Result<(), i32> { + /// let a = OnceLock::from(3); + /// let b = OnceLock::new(); + /// b.set(3)?; + /// assert_eq!(a, b); + /// Ok(()) + /// # } + /// ``` + fn from(value: T) -> Self { + let cell = Self::new(); + match cell.set(value) { + Ok(()) => cell, + Err(_) => unreachable!(), + } + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl PartialEq for OnceLock { + fn eq(&self, other: &OnceLock) -> bool { + self.get() == other.get() + } +} + +#[unstable(feature = "once_cell", issue = "74465")] +impl Eq for OnceLock {} + +#[unstable(feature = "once_cell", issue = "74465")] +unsafe impl<#[may_dangle] T> Drop for OnceLock { + fn drop(&mut self) { + if self.is_initialized() { + // SAFETY: The cell is initialized and being dropped, so it can't + // be accessed again. We also don't touch the `T` other than + // dropping it, which validates our usage of #[may_dangle]. + unsafe { (&mut *self.value.get()).assume_init_drop() }; + } + } +} + +#[cfg(test)] +mod tests; diff --git a/library/std/src/sync/once_lock/tests.rs b/library/std/src/sync/once_lock/tests.rs new file mode 100644 index 000000000..46695225b --- /dev/null +++ b/library/std/src/sync/once_lock/tests.rs @@ -0,0 +1,203 @@ +use crate::{ + panic, + sync::OnceLock, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + mpsc::channel, + }, + thread, +}; + +fn spawn_and_wait(f: impl FnOnce() -> R + Send + 'static) -> R { + thread::spawn(f).join().unwrap() +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn sync_once_cell() { + static ONCE_CELL: OnceLock = OnceLock::new(); + + assert!(ONCE_CELL.get().is_none()); + + spawn_and_wait(|| { + ONCE_CELL.get_or_init(|| 92); + assert_eq!(ONCE_CELL.get(), Some(&92)); + }); + + ONCE_CELL.get_or_init(|| panic!("Kabom!")); + assert_eq!(ONCE_CELL.get(), Some(&92)); +} + +#[test] +fn sync_once_cell_get_mut() { + let mut c = OnceLock::new(); + assert!(c.get_mut().is_none()); + c.set(90).unwrap(); + *c.get_mut().unwrap() += 2; + assert_eq!(c.get_mut(), Some(&mut 92)); +} + +#[test] +fn sync_once_cell_get_unchecked() { + let c = OnceLock::new(); + c.set(92).unwrap(); + unsafe { + assert_eq!(c.get_unchecked(), &92); + } +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn sync_once_cell_drop() { + static DROP_CNT: AtomicUsize = AtomicUsize::new(0); + struct Dropper; + impl Drop for Dropper { + fn drop(&mut self) { + DROP_CNT.fetch_add(1, SeqCst); + } + } + + let x = OnceLock::new(); + spawn_and_wait(move || { + x.get_or_init(|| Dropper); + assert_eq!(DROP_CNT.load(SeqCst), 0); + drop(x); + }); + + assert_eq!(DROP_CNT.load(SeqCst), 1); +} + +#[test] +fn sync_once_cell_drop_empty() { + let x = OnceLock::::new(); + drop(x); +} + +#[test] +fn clone() { + let s = OnceLock::new(); + let c = s.clone(); + assert!(c.get().is_none()); + + s.set("hello".to_string()).unwrap(); + let c = s.clone(); + assert_eq!(c.get().map(String::as_str), Some("hello")); +} + +#[test] +fn get_or_try_init() { + let cell: OnceLock = OnceLock::new(); + assert!(cell.get().is_none()); + + let res = panic::catch_unwind(|| cell.get_or_try_init(|| -> Result<_, ()> { panic!() })); + assert!(res.is_err()); + assert!(!cell.is_initialized()); + assert!(cell.get().is_none()); + + assert_eq!(cell.get_or_try_init(|| Err(())), Err(())); + + assert_eq!(cell.get_or_try_init(|| Ok::<_, ()>("hello".to_string())), Ok(&"hello".to_string())); + assert_eq!(cell.get(), Some(&"hello".to_string())); +} + +#[test] +fn from_impl() { + assert_eq!(OnceLock::from("value").get(), Some(&"value")); + assert_ne!(OnceLock::from("foo").get(), Some(&"bar")); +} + +#[test] +fn partialeq_impl() { + assert!(OnceLock::from("value") == OnceLock::from("value")); + assert!(OnceLock::from("foo") != OnceLock::from("bar")); + + assert!(OnceLock::::new() == OnceLock::new()); + assert!(OnceLock::::new() != OnceLock::from("value".to_owned())); +} + +#[test] +fn into_inner() { + let cell: OnceLock = OnceLock::new(); + assert_eq!(cell.into_inner(), None); + let cell = OnceLock::new(); + cell.set("hello".to_string()).unwrap(); + assert_eq!(cell.into_inner(), Some("hello".to_string())); +} + +#[test] +fn is_sync_send() { + fn assert_traits() {} + assert_traits::>(); +} + +#[test] +fn eval_once_macro() { + macro_rules! eval_once { + (|| -> $ty:ty { + $($body:tt)* + }) => {{ + static ONCE_CELL: OnceLock<$ty> = OnceLock::new(); + fn init() -> $ty { + $($body)* + } + ONCE_CELL.get_or_init(init) + }}; + } + + let fib: &'static Vec = eval_once! { + || -> Vec { + let mut res = vec![1, 1]; + for i in 0..10 { + let next = res[i] + res[i + 1]; + res.push(next); + } + res + } + }; + assert_eq!(fib[5], 8) +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn sync_once_cell_does_not_leak_partially_constructed_boxes() { + static ONCE_CELL: OnceLock = OnceLock::new(); + + let n_readers = 10; + let n_writers = 3; + const MSG: &str = "Hello, World"; + + let (tx, rx) = channel(); + + for _ in 0..n_readers { + let tx = tx.clone(); + thread::spawn(move || { + loop { + if let Some(msg) = ONCE_CELL.get() { + tx.send(msg).unwrap(); + break; + } + #[cfg(target_env = "sgx")] + crate::thread::yield_now(); + } + }); + } + for _ in 0..n_writers { + thread::spawn(move || { + let _ = ONCE_CELL.set(MSG.to_owned()); + }); + } + + for _ in 0..n_readers { + let msg = rx.recv().unwrap(); + assert_eq!(msg, MSG); + } +} + +#[test] +fn dropck() { + let cell = OnceLock::new(); + { + let s = String::new(); + cell.set(&s).unwrap(); + } +} diff --git a/library/std/src/sync/poison.rs b/library/std/src/sync/poison.rs new file mode 100644 index 000000000..741312d55 --- /dev/null +++ b/library/std/src/sync/poison.rs @@ -0,0 +1,272 @@ +use crate::error::Error; +use crate::fmt; +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::thread; + +pub struct Flag { + failed: AtomicBool, +} + +// Note that the Ordering uses to access the `failed` field of `Flag` below is +// always `Relaxed`, and that's because this isn't actually protecting any data, +// it's just a flag whether we've panicked or not. +// +// The actual location that this matters is when a mutex is **locked** which is +// where we have external synchronization ensuring that we see memory +// reads/writes to this flag. +// +// As a result, if it matters, we should see the correct value for `failed` in +// all cases. + +impl Flag { + #[inline] + pub const fn new() -> Flag { + Flag { failed: AtomicBool::new(false) } + } + + /// Check the flag for an unguarded borrow, where we only care about existing poison. + #[inline] + pub fn borrow(&self) -> LockResult<()> { + if self.get() { Err(PoisonError::new(())) } else { Ok(()) } + } + + /// Check the flag for a guarded borrow, where we may also set poison when `done`. + #[inline] + pub fn guard(&self) -> LockResult { + let ret = Guard { panicking: thread::panicking() }; + if self.get() { Err(PoisonError::new(ret)) } else { Ok(ret) } + } + + #[inline] + pub fn done(&self, guard: &Guard) { + if !guard.panicking && thread::panicking() { + self.failed.store(true, Ordering::Relaxed); + } + } + + #[inline] + pub fn get(&self) -> bool { + self.failed.load(Ordering::Relaxed) + } + + #[inline] + pub fn clear(&self) { + self.failed.store(false, Ordering::Relaxed) + } +} + +pub struct Guard { + panicking: bool, +} + +/// A type of error which can be returned whenever a lock is acquired. +/// +/// Both [`Mutex`]es and [`RwLock`]s are poisoned whenever a thread fails while the lock +/// is held. The precise semantics for when a lock is poisoned is documented on +/// each lock, but once a lock is poisoned then all future acquisitions will +/// return this error. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use std::thread; +/// +/// let mutex = Arc::new(Mutex::new(1)); +/// +/// // poison the mutex +/// let c_mutex = Arc::clone(&mutex); +/// let _ = thread::spawn(move || { +/// let mut data = c_mutex.lock().unwrap(); +/// *data = 2; +/// panic!(); +/// }).join(); +/// +/// match mutex.lock() { +/// Ok(_) => unreachable!(), +/// Err(p_err) => { +/// let data = p_err.get_ref(); +/// println!("recovered: {data}"); +/// } +/// }; +/// ``` +/// [`Mutex`]: crate::sync::Mutex +/// [`RwLock`]: crate::sync::RwLock +#[stable(feature = "rust1", since = "1.0.0")] +pub struct PoisonError { + guard: T, +} + +/// An enumeration of possible errors associated with a [`TryLockResult`] which +/// can occur while trying to acquire a lock, from the [`try_lock`] method on a +/// [`Mutex`] or the [`try_read`] and [`try_write`] methods on an [`RwLock`]. +/// +/// [`try_lock`]: crate::sync::Mutex::try_lock +/// [`try_read`]: crate::sync::RwLock::try_read +/// [`try_write`]: crate::sync::RwLock::try_write +/// [`Mutex`]: crate::sync::Mutex +/// [`RwLock`]: crate::sync::RwLock +#[stable(feature = "rust1", since = "1.0.0")] +pub enum TryLockError { + /// The lock could not be acquired because another thread failed while holding + /// the lock. + #[stable(feature = "rust1", since = "1.0.0")] + Poisoned(#[stable(feature = "rust1", since = "1.0.0")] PoisonError), + /// The lock could not be acquired at this time because the operation would + /// otherwise block. + #[stable(feature = "rust1", since = "1.0.0")] + WouldBlock, +} + +/// A type alias for the result of a lock method which can be poisoned. +/// +/// The [`Ok`] variant of this result indicates that the primitive was not +/// poisoned, and the `Guard` is contained within. The [`Err`] variant indicates +/// that the primitive was poisoned. Note that the [`Err`] variant *also* carries +/// the associated guard, and it can be acquired through the [`into_inner`] +/// method. +/// +/// [`into_inner`]: PoisonError::into_inner +#[stable(feature = "rust1", since = "1.0.0")] +pub type LockResult = Result>; + +/// A type alias for the result of a nonblocking locking method. +/// +/// For more information, see [`LockResult`]. A `TryLockResult` doesn't +/// necessarily hold the associated guard in the [`Err`] type as the lock might not +/// have been acquired for other reasons. +#[stable(feature = "rust1", since = "1.0.0")] +pub type TryLockResult = Result>; + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for PoisonError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PoisonError").finish_non_exhaustive() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for PoisonError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "poisoned lock: another task failed inside".fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Error for PoisonError { + #[allow(deprecated)] + fn description(&self) -> &str { + "poisoned lock: another task failed inside" + } +} + +impl PoisonError { + /// Creates a `PoisonError`. + /// + /// This is generally created by methods like [`Mutex::lock`](crate::sync::Mutex::lock) + /// or [`RwLock::read`](crate::sync::RwLock::read). + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn new(guard: T) -> PoisonError { + PoisonError { guard } + } + + /// Consumes this error indicating that a lock is poisoned, returning the + /// underlying guard to allow access regardless. + /// + /// # Examples + /// + /// ``` + /// use std::collections::HashSet; + /// use std::sync::{Arc, Mutex}; + /// use std::thread; + /// + /// let mutex = Arc::new(Mutex::new(HashSet::new())); + /// + /// // poison the mutex + /// let c_mutex = Arc::clone(&mutex); + /// let _ = thread::spawn(move || { + /// let mut data = c_mutex.lock().unwrap(); + /// data.insert(10); + /// panic!(); + /// }).join(); + /// + /// let p_err = mutex.lock().unwrap_err(); + /// let data = p_err.into_inner(); + /// println!("recovered {} items", data.len()); + /// ``` + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn into_inner(self) -> T { + self.guard + } + + /// Reaches into this error indicating that a lock is poisoned, returning a + /// reference to the underlying guard to allow access regardless. + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn get_ref(&self) -> &T { + &self.guard + } + + /// Reaches into this error indicating that a lock is poisoned, returning a + /// mutable reference to the underlying guard to allow access regardless. + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn get_mut(&mut self) -> &mut T { + &mut self.guard + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl From> for TryLockError { + fn from(err: PoisonError) -> TryLockError { + TryLockError::Poisoned(err) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for TryLockError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryLockError::Poisoned(..) => "Poisoned(..)".fmt(f), + TryLockError::WouldBlock => "WouldBlock".fmt(f), + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for TryLockError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryLockError::Poisoned(..) => "poisoned lock: another task failed inside", + TryLockError::WouldBlock => "try_lock failed because the operation would block", + } + .fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Error for TryLockError { + #[allow(deprecated, deprecated_in_future)] + fn description(&self) -> &str { + match *self { + TryLockError::Poisoned(ref p) => p.description(), + TryLockError::WouldBlock => "try_lock failed because the operation would block", + } + } + + #[allow(deprecated)] + fn cause(&self) -> Option<&dyn Error> { + match *self { + TryLockError::Poisoned(ref p) => Some(p), + _ => None, + } + } +} + +pub fn map_result(result: LockResult, f: F) -> LockResult +where + F: FnOnce(T) -> U, +{ + match result { + Ok(t) => Ok(f(t)), + Err(PoisonError { guard }) => Err(PoisonError::new(f(guard))), + } +} diff --git a/library/std/src/sync/rwlock.rs b/library/std/src/sync/rwlock.rs new file mode 100644 index 000000000..6e4a2cfc8 --- /dev/null +++ b/library/std/src/sync/rwlock.rs @@ -0,0 +1,615 @@ +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +use crate::cell::UnsafeCell; +use crate::fmt; +use crate::ops::{Deref, DerefMut}; +use crate::ptr::NonNull; +use crate::sync::{poison, LockResult, TryLockError, TryLockResult}; +use crate::sys_common::rwlock as sys; + +/// A reader-writer lock +/// +/// This type of lock allows a number of readers or at most one writer at any +/// point in time. The write portion of this lock typically allows modification +/// of the underlying data (exclusive access) and the read portion of this lock +/// typically allows for read-only access (shared access). +/// +/// In comparison, a [`Mutex`] does not distinguish between readers or writers +/// that acquire the lock, therefore blocking any threads waiting for the lock to +/// become available. An `RwLock` will allow any number of readers to acquire the +/// lock as long as a writer is not holding the lock. +/// +/// The priority policy of the lock is dependent on the underlying operating +/// system's implementation, and this type does not guarantee that any +/// particular policy will be used. In particular, a writer which is waiting to +/// acquire the lock in `write` might or might not block concurrent calls to +/// `read`, e.g.: +/// +///
Potential deadlock example +/// +/// ```text +/// // Thread 1 | // Thread 2 +/// let _rg = lock.read(); | +/// | // will block +/// | let _wg = lock.write(); +/// // may deadlock | +/// let _rg = lock.read(); | +/// ``` +///
+/// +/// The type parameter `T` represents the data that this lock protects. It is +/// required that `T` satisfies [`Send`] to be shared across threads and +/// [`Sync`] to allow concurrent access through readers. The RAII guards +/// returned from the locking methods implement [`Deref`] (and [`DerefMut`] +/// for the `write` methods) to allow access to the content of the lock. +/// +/// # Poisoning +/// +/// An `RwLock`, like [`Mutex`], will become poisoned on a panic. Note, however, +/// that an `RwLock` may only be poisoned if a panic occurs while it is locked +/// exclusively (write mode). If a panic occurs in any reader, then the lock +/// will not be poisoned. +/// +/// # Examples +/// +/// ``` +/// use std::sync::RwLock; +/// +/// let lock = RwLock::new(5); +/// +/// // many reader locks can be held at once +/// { +/// let r1 = lock.read().unwrap(); +/// let r2 = lock.read().unwrap(); +/// assert_eq!(*r1, 5); +/// assert_eq!(*r2, 5); +/// } // read locks are dropped at this point +/// +/// // only one write lock may be held, however +/// { +/// let mut w = lock.write().unwrap(); +/// *w += 1; +/// assert_eq!(*w, 6); +/// } // write lock is dropped here +/// ``` +/// +/// [`Mutex`]: super::Mutex +#[stable(feature = "rust1", since = "1.0.0")] +pub struct RwLock { + inner: sys::MovableRwLock, + poison: poison::Flag, + data: UnsafeCell, +} + +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Send for RwLock {} +#[stable(feature = "rust1", since = "1.0.0")] +unsafe impl Sync for RwLock {} + +/// RAII structure used to release the shared read access of a lock when +/// dropped. +/// +/// This structure is created by the [`read`] and [`try_read`] methods on +/// [`RwLock`]. +/// +/// [`read`]: RwLock::read +/// [`try_read`]: RwLock::try_read +#[must_use = "if unused the RwLock will immediately unlock"] +#[must_not_suspend = "holding a RwLockReadGuard across suspend \ + points can cause deadlocks, delays, \ + and cause Futures to not implement `Send`"] +#[stable(feature = "rust1", since = "1.0.0")] +#[clippy::has_significant_drop] +pub struct RwLockReadGuard<'a, T: ?Sized + 'a> { + // NB: we use a pointer instead of `&'a T` to avoid `noalias` violations, because a + // `Ref` argument doesn't hold immutability for its whole scope, only until it drops. + // `NonNull` is also covariant over `T`, just like we would have with `&T`. `NonNull` + // is preferable over `const* T` to allow for niche optimization. + data: NonNull, + inner_lock: &'a sys::MovableRwLock, +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl !Send for RwLockReadGuard<'_, T> {} + +#[stable(feature = "rwlock_guard_sync", since = "1.23.0")] +unsafe impl Sync for RwLockReadGuard<'_, T> {} + +/// RAII structure used to release the exclusive write access of a lock when +/// dropped. +/// +/// This structure is created by the [`write`] and [`try_write`] methods +/// on [`RwLock`]. +/// +/// [`write`]: RwLock::write +/// [`try_write`]: RwLock::try_write +#[must_use = "if unused the RwLock will immediately unlock"] +#[must_not_suspend = "holding a RwLockWriteGuard across suspend \ + points can cause deadlocks, delays, \ + and cause Future's to not implement `Send`"] +#[stable(feature = "rust1", since = "1.0.0")] +#[clippy::has_significant_drop] +pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> { + lock: &'a RwLock, + poison: poison::Guard, +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl !Send for RwLockWriteGuard<'_, T> {} + +#[stable(feature = "rwlock_guard_sync", since = "1.23.0")] +unsafe impl Sync for RwLockWriteGuard<'_, T> {} + +impl RwLock { + /// Creates a new instance of an `RwLock` which is unlocked. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(5); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[rustc_const_stable(feature = "const_locks", since = "1.63.0")] + #[inline] + pub const fn new(t: T) -> RwLock { + RwLock { + inner: sys::MovableRwLock::new(), + poison: poison::Flag::new(), + data: UnsafeCell::new(t), + } + } +} + +impl RwLock { + /// Locks this rwlock with shared read access, blocking the current thread + /// until it can be acquired. + /// + /// The calling thread will be blocked until there are no more writers which + /// hold the lock. There may be other readers currently inside the lock when + /// this method returns. This method does not provide any guarantees with + /// respect to the ordering of whether contentious readers or writers will + /// acquire the lock first. + /// + /// Returns an RAII guard which will release this thread's shared access + /// once it is dropped. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. + /// The failure will occur immediately after the lock has been acquired. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, RwLock}; + /// use std::thread; + /// + /// let lock = Arc::new(RwLock::new(1)); + /// let c_lock = Arc::clone(&lock); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// thread::spawn(move || { + /// let r = c_lock.read(); + /// assert!(r.is_ok()); + /// }).join().unwrap(); + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn read(&self) -> LockResult> { + unsafe { + self.inner.read(); + RwLockReadGuard::new(self) + } + } + + /// Attempts to acquire this rwlock with shared read access. + /// + /// If the access could not be granted at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned which will release the shared access + /// when it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + /// + /// # Errors + /// + /// This function will return the [`Poisoned`] error if the RwLock is poisoned. + /// An RwLock is poisoned whenever a writer panics while holding an exclusive + /// lock. `Poisoned` will only be returned if the lock would have otherwise been + /// acquired. + /// + /// This function will return the [`WouldBlock`] error if the RwLock could not + /// be acquired because it was already locked exclusively. + /// + /// [`Poisoned`]: TryLockError::Poisoned + /// [`WouldBlock`]: TryLockError::WouldBlock + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// match lock.try_read() { + /// Ok(n) => assert_eq!(*n, 1), + /// Err(_) => unreachable!(), + /// }; + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_read(&self) -> TryLockResult> { + unsafe { + if self.inner.try_read() { + Ok(RwLockReadGuard::new(self)?) + } else { + Err(TryLockError::WouldBlock) + } + } + } + + /// Locks this rwlock with exclusive write access, blocking the current + /// thread until it can be acquired. + /// + /// This function will not return while other writers or other readers + /// currently have access to the lock. + /// + /// Returns an RAII guard which will drop the write access of this rwlock + /// when dropped. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. + /// An error will be returned when the lock is acquired. + /// + /// # Panics + /// + /// This function might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let mut n = lock.write().unwrap(); + /// *n = 2; + /// + /// assert!(lock.try_read().is_err()); + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn write(&self) -> LockResult> { + unsafe { + self.inner.write(); + RwLockWriteGuard::new(self) + } + } + + /// Attempts to lock this rwlock with exclusive write access. + /// + /// If the lock could not be acquired at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned which will release the lock when + /// it is dropped. + /// + /// This function does not block. + /// + /// This function does not provide any guarantees with respect to the ordering + /// of whether contentious readers or writers will acquire the lock first. + /// + /// # Errors + /// + /// This function will return the [`Poisoned`] error if the RwLock is + /// poisoned. An RwLock is poisoned whenever a writer panics while holding + /// an exclusive lock. `Poisoned` will only be returned if the lock would have + /// otherwise been acquired. + /// + /// This function will return the [`WouldBlock`] error if the RwLock could not + /// be acquired because it was already locked exclusively. + /// + /// [`Poisoned`]: TryLockError::Poisoned + /// [`WouldBlock`]: TryLockError::WouldBlock + /// + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// assert!(lock.try_write().is_err()); + /// ``` + #[inline] + #[stable(feature = "rust1", since = "1.0.0")] + pub fn try_write(&self) -> TryLockResult> { + unsafe { + if self.inner.try_write() { + Ok(RwLockWriteGuard::new(self)?) + } else { + Err(TryLockError::WouldBlock) + } + } + } + + /// Determines whether the lock is poisoned. + /// + /// If another thread is active, the lock can still become poisoned at any + /// time. You should not trust a `false` value for program correctness + /// without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, RwLock}; + /// use std::thread; + /// + /// let lock = Arc::new(RwLock::new(0)); + /// let c_lock = Arc::clone(&lock); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the lock gets poisoned + /// }).join(); + /// assert_eq!(lock.is_poisoned(), true); + /// ``` + #[inline] + #[stable(feature = "sync_poison", since = "1.2.0")] + pub fn is_poisoned(&self) -> bool { + self.poison.get() + } + + /// Clear the poisoned state from a lock + /// + /// If the lock is poisoned, it will remain poisoned until this function is called. This allows + /// recovering from a poisoned state and marking that it has recovered. For example, if the + /// value is overwritten by a known-good value, then the mutex can be marked as un-poisoned. Or + /// possibly, the value could be inspected to determine if it is in a consistent state, and if + /// so the poison is removed. + /// + /// # Examples + /// + /// ``` + /// #![feature(mutex_unpoison)] + /// + /// use std::sync::{Arc, RwLock}; + /// use std::thread; + /// + /// let lock = Arc::new(RwLock::new(0)); + /// let c_lock = Arc::clone(&lock); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the mutex gets poisoned + /// }).join(); + /// + /// assert_eq!(lock.is_poisoned(), true); + /// let guard = lock.write().unwrap_or_else(|mut e| { + /// **e.get_mut() = 1; + /// lock.clear_poison(); + /// e.into_inner() + /// }); + /// assert_eq!(lock.is_poisoned(), false); + /// assert_eq!(*guard, 1); + /// ``` + #[inline] + #[unstable(feature = "mutex_unpoison", issue = "96469")] + pub fn clear_poison(&self) { + self.poison.clear(); + } + + /// Consumes this `RwLock`, returning the underlying data. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` + #[stable(feature = "rwlock_into_inner", since = "1.6.0")] + pub fn into_inner(self) -> LockResult + where + T: Sized, + { + let data = self.data.into_inner(); + poison::map_result(self.poison.borrow(), |()| data) + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `RwLock` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Errors + /// + /// This function will return an error if the RwLock is poisoned. An RwLock + /// is poisoned whenever a writer panics while holding an exclusive lock. An + /// error will only be returned if the lock would have otherwise been + /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let mut lock = RwLock::new(0); + /// *lock.get_mut().unwrap() = 10; + /// assert_eq!(*lock.read().unwrap(), 10); + /// ``` + #[stable(feature = "rwlock_get_mut", since = "1.6.0")] + pub fn get_mut(&mut self) -> LockResult<&mut T> { + let data = self.data.get_mut(); + poison::map_result(self.poison.borrow(), |()| data) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut d = f.debug_struct("RwLock"); + match self.try_read() { + Ok(guard) => { + d.field("data", &&*guard); + } + Err(TryLockError::Poisoned(err)) => { + d.field("data", &&**err.get_ref()); + } + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + d.field("data", &LockedPlaceholder); + } + } + d.field("poisoned", &self.poison.get()); + d.finish_non_exhaustive() + } +} + +#[stable(feature = "rw_lock_default", since = "1.10.0")] +impl Default for RwLock { + /// Creates a new `RwLock`, with the `Default` value for T. + fn default() -> RwLock { + RwLock::new(Default::default()) + } +} + +#[stable(feature = "rw_lock_from", since = "1.24.0")] +impl From for RwLock { + /// Creates a new instance of an `RwLock` which is unlocked. + /// This is equivalent to [`RwLock::new`]. + fn from(t: T) -> Self { + RwLock::new(t) + } +} + +impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { + /// Create a new instance of `RwLockReadGuard` from a `RwLock`. + // SAFETY: if and only if `lock.inner.read()` (or `lock.inner.try_read()`) has been + // successfully called from the same thread before instantiating this object. + unsafe fn new(lock: &'rwlock RwLock) -> LockResult> { + poison::map_result(lock.poison.borrow(), |()| RwLockReadGuard { + data: NonNull::new_unchecked(lock.data.get()), + inner_lock: &lock.inner, + }) + } +} + +impl<'rwlock, T: ?Sized> RwLockWriteGuard<'rwlock, T> { + /// Create a new instance of `RwLockWriteGuard` from a `RwLock`. + // SAFETY: if and only if `lock.inner.write()` (or `lock.inner.try_write()`) has been + // successfully called from the same thread before instantiating this object. + unsafe fn new(lock: &'rwlock RwLock) -> LockResult> { + poison::map_result(lock.poison.guard(), |guard| RwLockWriteGuard { lock, poison: guard }) + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl fmt::Display for RwLockReadGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl fmt::Debug for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl fmt::Display for RwLockWriteGuard<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Deref for RwLockReadGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + // SAFETY: the conditions of `RwLockGuard::new` were satisfied when created. + unsafe { self.data.as_ref() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Deref for RwLockWriteGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when created. + unsafe { &*self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl DerefMut for RwLockWriteGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when created. + unsafe { &mut *self.lock.data.get() } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for RwLockReadGuard<'_, T> { + fn drop(&mut self) { + // SAFETY: the conditions of `RwLockReadGuard::new` were satisfied when created. + unsafe { + self.inner_lock.read_unlock(); + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl Drop for RwLockWriteGuard<'_, T> { + fn drop(&mut self) { + self.lock.poison.done(&self.poison); + // SAFETY: the conditions of `RwLockWriteGuard::new` were satisfied when created. + unsafe { + self.lock.inner.write_unlock(); + } + } +} diff --git a/library/std/src/sync/rwlock/tests.rs b/library/std/src/sync/rwlock/tests.rs new file mode 100644 index 000000000..08255c985 --- /dev/null +++ b/library/std/src/sync/rwlock/tests.rs @@ -0,0 +1,259 @@ +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::mpsc::channel; +use crate::sync::{Arc, RwLock, RwLockReadGuard, TryLockError}; +use crate::thread; +use rand::{self, Rng}; + +#[derive(Eq, PartialEq, Debug)] +struct NonCopy(i32); + +#[test] +fn smoke() { + let l = RwLock::new(()); + drop(l.read().unwrap()); + drop(l.write().unwrap()); + drop((l.read().unwrap(), l.read().unwrap())); + drop(l.write().unwrap()); +} + +#[test] +fn frob() { + const N: u32 = 10; + const M: usize = 1000; + + let r = Arc::new(RwLock::new(())); + + let (tx, rx) = channel::<()>(); + for _ in 0..N { + let tx = tx.clone(); + let r = r.clone(); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _ in 0..M { + if rng.gen_bool(1.0 / (N as f64)) { + drop(r.write().unwrap()); + } else { + drop(r.read().unwrap()); + } + } + drop(tx); + }); + } + drop(tx); + let _ = rx.recv(); +} + +#[test] +fn test_rw_arc_poison_wr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.read().is_err()); +} + +#[test] +fn test_rw_arc_poison_ww() { + let arc = Arc::new(RwLock::new(1)); + assert!(!arc.is_poisoned()); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.write().unwrap(); + panic!(); + }) + .join(); + assert!(arc.write().is_err()); + assert!(arc.is_poisoned()); +} + +#[test] +fn test_rw_arc_no_poison_rr() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 1); +} +#[test] +fn test_rw_arc_no_poison_rw() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _: Result<(), _> = thread::spawn(move || { + let _lock = arc2.read().unwrap(); + panic!() + }) + .join(); + let lock = arc.write().unwrap(); + assert_eq!(*lock, 1); +} + +#[test] +fn test_rw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + let (tx, rx) = channel(); + + thread::spawn(move || { + let mut lock = arc2.write().unwrap(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + tx.send(()).unwrap(); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read().unwrap(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + rx.recv().unwrap(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 10); +} + +#[test] +fn test_rw_arc_access_in_unwind() { + let arc = Arc::new(RwLock::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + let mut lock = self.i.write().unwrap(); + *lock += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.read().unwrap(); + assert_eq!(*lock, 2); +} + +#[test] +fn test_rwlock_unsized() { + let rw: &RwLock<[i32]> = &RwLock::new([1, 2, 3]); + { + let b = &mut *rw.write().unwrap(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*rw.read().unwrap(), comp); +} + +#[test] +fn test_rwlock_try_write() { + let lock = RwLock::new(0isize); + let read_guard = lock.read().unwrap(); + + let write_result = lock.try_write(); + match write_result { + Err(TryLockError::WouldBlock) => (), + Ok(_) => assert!(false, "try_write should not succeed while read_guard is in scope"), + Err(_) => assert!(false, "unexpected error"), + } + + drop(read_guard); +} + +#[test] +fn test_into_inner() { + let m = RwLock::new(NonCopy(10)); + assert_eq!(m.into_inner().unwrap(), NonCopy(10)); +} + +#[test] +fn test_into_inner_drop() { + struct Foo(Arc); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = RwLock::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner().unwrap(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); +} + +#[test] +fn test_into_inner_poison() { + let m = Arc::new(RwLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison RwLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().into_inner() { + Err(e) => assert_eq!(e.into_inner(), NonCopy(10)), + Ok(x) => panic!("into_inner of poisoned RwLock is Ok: {x:?}"), + } +} + +#[test] +fn test_get_mut() { + let mut m = RwLock::new(NonCopy(10)); + *m.get_mut().unwrap() = NonCopy(20); + assert_eq!(m.into_inner().unwrap(), NonCopy(20)); +} + +#[test] +fn test_get_mut_poison() { + let m = Arc::new(RwLock::new(NonCopy(10))); + let m2 = m.clone(); + let _ = thread::spawn(move || { + let _lock = m2.write().unwrap(); + panic!("test panic in inner thread to poison RwLock"); + }) + .join(); + + assert!(m.is_poisoned()); + match Arc::try_unwrap(m).unwrap().get_mut() { + Err(e) => assert_eq!(*e.into_inner(), NonCopy(10)), + Ok(x) => panic!("get_mut of poisoned RwLock is Ok: {x:?}"), + } +} + +#[test] +fn test_read_guard_covariance() { + fn do_stuff<'a>(_: RwLockReadGuard<'_, &'a i32>, _: &'a i32) {} + let j: i32 = 5; + let lock = RwLock::new(&j); + { + let i = 6; + do_stuff(lock.read().unwrap(), &i); + } + drop(lock); +} -- cgit v1.2.3