diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
commit | 698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch) | |
tree | 173a775858bd501c378080a10dca74132f05bc50 /library/std/src/sys_common/thread_parker | |
parent | Initial commit. (diff) | |
download | rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip |
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sys_common/thread_parker')
-rw-r--r-- | library/std/src/sys_common/thread_parker/futex.rs | 97 | ||||
-rw-r--r-- | library/std/src/sys_common/thread_parker/generic.rs | 125 | ||||
-rw-r--r-- | library/std/src/sys_common/thread_parker/mod.rs | 22 | ||||
-rw-r--r-- | library/std/src/sys_common/thread_parker/wait_flag.rs | 102 |
4 files changed, 346 insertions, 0 deletions
diff --git a/library/std/src/sys_common/thread_parker/futex.rs b/library/std/src/sys_common/thread_parker/futex.rs new file mode 100644 index 000000000..d9e2f39e3 --- /dev/null +++ b/library/std/src/sys_common/thread_parker/futex.rs @@ -0,0 +1,97 @@ +use crate::pin::Pin; +use crate::sync::atomic::AtomicU32; +use crate::sync::atomic::Ordering::{Acquire, Release}; +use crate::sys::futex::{futex_wait, futex_wake}; +use crate::time::Duration; + +const PARKED: u32 = u32::MAX; +const EMPTY: u32 = 0; +const NOTIFIED: u32 = 1; + +pub struct Parker { + state: AtomicU32, +} + +// Notes about memory ordering: +// +// Memory ordering is only relevant for the relative ordering of operations +// between different variables. Even Ordering::Relaxed guarantees a +// monotonic/consistent order when looking at just a single atomic variable. +// +// So, since this parker is just a single atomic variable, we only need to look +// at the ordering guarantees we need to provide to the 'outside world'. +// +// The only memory ordering guarantee that parking and unparking provide, is +// that things which happened before unpark() are visible on the thread +// returning from park() afterwards. Otherwise, it was effectively unparked +// before unpark() was called while still consuming the 'token'. +// +// In other words, unpark() needs to synchronize with the part of park() that +// consumes the token and returns. +// +// This is done with a release-acquire synchronization, by using +// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using +// Ordering::Acquire when checking for this state in park(). +impl Parker { + /// Construct the futex parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Self { state: AtomicU32::new(EMPTY) }); + } + + // Assumes this is only called by the thread that owns the Parker, + // which means that `self.state != PARKED`. + pub unsafe fn park(self: Pin<&Self>) { + // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the + // first case. + if self.state.fetch_sub(1, Acquire) == NOTIFIED { + return; + } + loop { + // Wait for something to happen, assuming it's still set to PARKED. + futex_wait(&self.state, PARKED, None); + // Change NOTIFIED=>EMPTY and return in that case. + if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() { + return; + } else { + // Spurious wake up. We loop to try again. + } + } + } + + // Assumes this is only called by the thread that owns the Parker, + // which means that `self.state != PARKED`. This implementation doesn't + // require `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) { + // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the + // first case. + if self.state.fetch_sub(1, Acquire) == NOTIFIED { + return; + } + // Wait for something to happen, assuming it's still set to PARKED. + futex_wait(&self.state, PARKED, Some(timeout)); + // This is not just a store, because we need to establish a + // release-acquire ordering with unpark(). + if self.state.swap(EMPTY, Acquire) == NOTIFIED { + // Woke up because of unpark(). + } else { + // Timeout or spurious wake up. + // We return either way, because we can't easily tell if it was the + // timeout or not. + } + } + + // This implementation doesn't require `Pin`, but other implementations do. + #[inline] + pub fn unpark(self: Pin<&Self>) { + // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and + // wake the thread in the first case. + // + // Note that even NOTIFIED=>NOTIFIED results in a write. This is on + // purpose, to make sure every unpark() has a release-acquire ordering + // with park(). + if self.state.swap(NOTIFIED, Release) == PARKED { + futex_wake(&self.state); + } + } +} diff --git a/library/std/src/sys_common/thread_parker/generic.rs b/library/std/src/sys_common/thread_parker/generic.rs new file mode 100644 index 000000000..f3d8b34d3 --- /dev/null +++ b/library/std/src/sys_common/thread_parker/generic.rs @@ -0,0 +1,125 @@ +//! Parker implementation based on a Mutex and Condvar. + +use crate::pin::Pin; +use crate::sync::atomic::AtomicUsize; +use crate::sync::atomic::Ordering::SeqCst; +use crate::sync::{Condvar, Mutex}; +use crate::time::Duration; + +const EMPTY: usize = 0; +const PARKED: usize = 1; +const NOTIFIED: usize = 2; + +pub struct Parker { + state: AtomicUsize, + lock: Mutex<()>, + cvar: Condvar, +} + +impl Parker { + /// Construct the generic parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Parker { + state: AtomicUsize::new(EMPTY), + lock: Mutex::new(()), + cvar: Condvar::new(), + }); + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { + // If we were previously notified then we consume this notification and + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + + // Otherwise we need to coordinate going to sleep + let mut m = self.lock.lock().unwrap(); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => panic!("inconsistent park state"), + } + loop { + m = self.cvar.wait(m).unwrap(); + match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) { + Ok(_) => return, // got a notification + Err(_) => {} // spurious wakeup, go back to sleep + } + } + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + // Like `park` above we have a fast path for an already-notified thread, and + // afterwards we start coordinating for a sleep. + // return quickly. + if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() { + return; + } + let m = self.lock.lock().unwrap(); + match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read again here, see `park`. + let old = self.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + return; + } // should consume this notification, so prohibit spurious wakeups in next park. + Err(_) => panic!("inconsistent park_timeout state"), + } + + // Wait with a timeout, and if we spuriously wake up or otherwise wake up + // from a notification we just want to unconditionally set the state back to + // empty, either consuming a notification or un-flagging ourselves as + // parked. + let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap(); + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {n}"), + } + } + + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { + // To ensure the unparked thread will observe any writes we made + // before this call, we must perform a release operation that `park` + // can synchronize with. To do that we must write `NOTIFIED` even if + // `state` is already `NOTIFIED`. That is why this must be a swap + // rather than a compare-and-swap that returns if it reads `NOTIFIED` + // on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), + } + + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.lock.lock().unwrap()); + self.cvar.notify_one() + } +} diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs new file mode 100644 index 000000000..cbd7832eb --- /dev/null +++ b/library/std/src/sys_common/thread_parker/mod.rs @@ -0,0 +1,22 @@ +cfg_if::cfg_if! { + if #[cfg(any( + target_os = "linux", + target_os = "android", + all(target_arch = "wasm32", target_feature = "atomics"), + target_os = "freebsd", + target_os = "openbsd", + target_os = "dragonfly", + target_os = "fuchsia", + ))] { + mod futex; + pub use futex::Parker; + } else if #[cfg(target_os = "solid_asp3")] { + mod wait_flag; + pub use wait_flag::Parker; + } else if #[cfg(any(windows, target_family = "unix"))] { + pub use crate::sys::thread_parker::Parker; + } else { + mod generic; + pub use generic::Parker; + } +} diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parker/wait_flag.rs new file mode 100644 index 000000000..6561c1866 --- /dev/null +++ b/library/std/src/sys_common/thread_parker/wait_flag.rs @@ -0,0 +1,102 @@ +//! A wait-flag-based thread parker. +//! +//! Some operating systems provide low-level parking primitives like wait counts, +//! event flags or semaphores which are not susceptible to race conditions (meaning +//! the wakeup can occur before the wait operation). To implement the `std` thread +//! parker on top of these primitives, we only have to ensure that parking is fast +//! when the thread token is available, the atomic ordering guarantees are maintained +//! and spurious wakeups are minimized. +//! +//! To achieve this, this parker uses an atomic variable with three states: `EMPTY`, +//! `PARKED` and `NOTIFIED`: +//! * `EMPTY` means the token has not been made available, but the thread is not +//! currently waiting on it. +//! * `PARKED` means the token is not available and the thread is parked. +//! * `NOTIFIED` means the token is available. +//! +//! `park` and `park_timeout` change the state from `EMPTY` to `PARKED` and from +//! `NOTIFIED` to `EMPTY`. If the state was `NOTIFIED`, the thread was unparked and +//! execution can continue without calling into the OS. If the state was `EMPTY`, +//! the token is not available and the thread waits on the primitive (here called +//! "wait flag"). +//! +//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread +//! is or will be sleeping on the wait flag, so we raise it. + +use crate::pin::Pin; +use crate::sync::atomic::AtomicI8; +use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use crate::sys::wait_flag::WaitFlag; +use crate::time::Duration; + +const EMPTY: i8 = 0; +const PARKED: i8 = -1; +const NOTIFIED: i8 = 1; + +pub struct Parker { + state: AtomicI8, + wait_flag: WaitFlag, +} + +impl Parker { + /// Construct a parker for the current thread. The UNIX parker + /// implementation requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() }) + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { + match self.state.fetch_sub(1, Acquire) { + // NOTIFIED => EMPTY + NOTIFIED => return, + // EMPTY => PARKED + EMPTY => (), + _ => panic!("inconsistent park state"), + } + + // Avoid waking up from spurious wakeups (these are quite likely, see below). + loop { + self.wait_flag.wait(); + + match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) { + Ok(_) => return, + Err(PARKED) => (), + Err(_) => panic!("inconsistent park state"), + } + } + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + match self.state.fetch_sub(1, Acquire) { + NOTIFIED => return, + EMPTY => (), + _ => panic!("inconsistent park state"), + } + + self.wait_flag.wait_timeout(dur); + + // Either a wakeup or a timeout occurred. Wakeups may be spurious, as there can be + // a race condition when `unpark` is performed between receiving the timeout and + // resetting the state, resulting in the eventflag being set unnecessarily. `park` + // is protected against this by looping until the token is actually given, but + // here we cannot easily tell. + + // Use `swap` to provide acquire ordering. + match self.state.swap(EMPTY, Acquire) { + NOTIFIED => (), + PARKED => (), + _ => panic!("inconsistent park state"), + } + } + + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { + let state = self.state.swap(NOTIFIED, Release); + + if state == PARKED { + self.wait_flag.raise(); + } + } +} |