summaryrefslogtreecommitdiffstats
path: root/library/std/src/sys_common/thread_parker
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
commit698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch)
tree173a775858bd501c378080a10dca74132f05bc50 /library/std/src/sys_common/thread_parker
parentInitial commit. (diff)
downloadrustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz
rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sys_common/thread_parker')
-rw-r--r--library/std/src/sys_common/thread_parker/futex.rs97
-rw-r--r--library/std/src/sys_common/thread_parker/generic.rs125
-rw-r--r--library/std/src/sys_common/thread_parker/mod.rs22
-rw-r--r--library/std/src/sys_common/thread_parker/wait_flag.rs102
4 files changed, 346 insertions, 0 deletions
diff --git a/library/std/src/sys_common/thread_parker/futex.rs b/library/std/src/sys_common/thread_parker/futex.rs
new file mode 100644
index 000000000..d9e2f39e3
--- /dev/null
+++ b/library/std/src/sys_common/thread_parker/futex.rs
@@ -0,0 +1,97 @@
+use crate::pin::Pin;
+use crate::sync::atomic::AtomicU32;
+use crate::sync::atomic::Ordering::{Acquire, Release};
+use crate::sys::futex::{futex_wait, futex_wake};
+use crate::time::Duration;
+
+const PARKED: u32 = u32::MAX;
+const EMPTY: u32 = 0;
+const NOTIFIED: u32 = 1;
+
+pub struct Parker {
+ state: AtomicU32,
+}
+
+// Notes about memory ordering:
+//
+// Memory ordering is only relevant for the relative ordering of operations
+// between different variables. Even Ordering::Relaxed guarantees a
+// monotonic/consistent order when looking at just a single atomic variable.
+//
+// So, since this parker is just a single atomic variable, we only need to look
+// at the ordering guarantees we need to provide to the 'outside world'.
+//
+// The only memory ordering guarantee that parking and unparking provide, is
+// that things which happened before unpark() are visible on the thread
+// returning from park() afterwards. Otherwise, it was effectively unparked
+// before unpark() was called while still consuming the 'token'.
+//
+// In other words, unpark() needs to synchronize with the part of park() that
+// consumes the token and returns.
+//
+// This is done with a release-acquire synchronization, by using
+// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
+// Ordering::Acquire when checking for this state in park().
+impl Parker {
+ /// Construct the futex parker. The UNIX parker implementation
+ /// requires this to happen in-place.
+ pub unsafe fn new(parker: *mut Parker) {
+ parker.write(Self { state: AtomicU32::new(EMPTY) });
+ }
+
+ // Assumes this is only called by the thread that owns the Parker,
+ // which means that `self.state != PARKED`.
+ pub unsafe fn park(self: Pin<&Self>) {
+ // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+ // first case.
+ if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+ return;
+ }
+ loop {
+ // Wait for something to happen, assuming it's still set to PARKED.
+ futex_wait(&self.state, PARKED, None);
+ // Change NOTIFIED=>EMPTY and return in that case.
+ if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
+ return;
+ } else {
+ // Spurious wake up. We loop to try again.
+ }
+ }
+ }
+
+ // Assumes this is only called by the thread that owns the Parker,
+ // which means that `self.state != PARKED`. This implementation doesn't
+ // require `Pin`, but other implementations do.
+ pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
+ // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+ // first case.
+ if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+ return;
+ }
+ // Wait for something to happen, assuming it's still set to PARKED.
+ futex_wait(&self.state, PARKED, Some(timeout));
+ // This is not just a store, because we need to establish a
+ // release-acquire ordering with unpark().
+ if self.state.swap(EMPTY, Acquire) == NOTIFIED {
+ // Woke up because of unpark().
+ } else {
+ // Timeout or spurious wake up.
+ // We return either way, because we can't easily tell if it was the
+ // timeout or not.
+ }
+ }
+
+ // This implementation doesn't require `Pin`, but other implementations do.
+ #[inline]
+ pub fn unpark(self: Pin<&Self>) {
+ // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
+ // wake the thread in the first case.
+ //
+ // Note that even NOTIFIED=>NOTIFIED results in a write. This is on
+ // purpose, to make sure every unpark() has a release-acquire ordering
+ // with park().
+ if self.state.swap(NOTIFIED, Release) == PARKED {
+ futex_wake(&self.state);
+ }
+ }
+}
diff --git a/library/std/src/sys_common/thread_parker/generic.rs b/library/std/src/sys_common/thread_parker/generic.rs
new file mode 100644
index 000000000..f3d8b34d3
--- /dev/null
+++ b/library/std/src/sys_common/thread_parker/generic.rs
@@ -0,0 +1,125 @@
+//! Parker implementation based on a Mutex and Condvar.
+
+use crate::pin::Pin;
+use crate::sync::atomic::AtomicUsize;
+use crate::sync::atomic::Ordering::SeqCst;
+use crate::sync::{Condvar, Mutex};
+use crate::time::Duration;
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+pub struct Parker {
+ state: AtomicUsize,
+ lock: Mutex<()>,
+ cvar: Condvar,
+}
+
+impl Parker {
+ /// Construct the generic parker. The UNIX parker implementation
+ /// requires this to happen in-place.
+ pub unsafe fn new(parker: *mut Parker) {
+ parker.write(Parker {
+ state: AtomicUsize::new(EMPTY),
+ lock: Mutex::new(()),
+ cvar: Condvar::new(),
+ });
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park(self: Pin<&Self>) {
+ // If we were previously notified then we consume this notification and
+ // return quickly.
+ if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+ return;
+ }
+
+ // Otherwise we need to coordinate going to sleep
+ let mut m = self.lock.lock().unwrap();
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read here, even though we know it will be `NOTIFIED`.
+ // This is because `unpark` may have been called again since we read
+ // `NOTIFIED` in the `compare_exchange` above. We must perform an
+ // acquire operation that synchronizes with that `unpark` to observe
+ // any writes it made before the call to unpark. To do that we must
+ // read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ } // should consume this notification, so prohibit spurious wakeups in next park.
+ Err(_) => panic!("inconsistent park state"),
+ }
+ loop {
+ m = self.cvar.wait(m).unwrap();
+ match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+ Ok(_) => return, // got a notification
+ Err(_) => {} // spurious wakeup, go back to sleep
+ }
+ }
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+ // Like `park` above we have a fast path for an already-notified thread, and
+ // afterwards we start coordinating for a sleep.
+ // return quickly.
+ if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+ return;
+ }
+ let m = self.lock.lock().unwrap();
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read again here, see `park`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ } // should consume this notification, so prohibit spurious wakeups in next park.
+ Err(_) => panic!("inconsistent park_timeout state"),
+ }
+
+ // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+ // from a notification we just want to unconditionally set the state back to
+ // empty, either consuming a notification or un-flagging ourselves as
+ // parked.
+ let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED => {} // got a notification, hurray!
+ PARKED => {} // no notification, alas
+ n => panic!("inconsistent park_timeout state: {n}"),
+ }
+ }
+
+ // This implementation doesn't require `Pin`, but other implementations do.
+ pub fn unpark(self: Pin<&Self>) {
+ // To ensure the unparked thread will observe any writes we made
+ // before this call, we must perform a release operation that `park`
+ // can synchronize with. To do that we must write `NOTIFIED` even if
+ // `state` is already `NOTIFIED`. That is why this must be a swap
+ // rather than a compare-and-swap that returns if it reads `NOTIFIED`
+ // on failure.
+ match self.state.swap(NOTIFIED, SeqCst) {
+ EMPTY => return, // no one was waiting
+ NOTIFIED => return, // already unparked
+ PARKED => {} // gotta go wake someone up
+ _ => panic!("inconsistent state in unpark"),
+ }
+
+ // There is a period between when the parked thread sets `state` to
+ // `PARKED` (or last checked `state` in the case of a spurious wake
+ // up) and when it actually waits on `cvar`. If we were to notify
+ // during this period it would be ignored and then when the parked
+ // thread went to sleep it would never wake up. Fortunately, it has
+ // `lock` locked at this stage so we can acquire `lock` to wait until
+ // it is ready to receive the notification.
+ //
+ // Releasing `lock` before the call to `notify_one` means that when the
+ // parked thread wakes it doesn't get woken only to have to wait for us
+ // to release `lock`.
+ drop(self.lock.lock().unwrap());
+ self.cvar.notify_one()
+ }
+}
diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs
new file mode 100644
index 000000000..cbd7832eb
--- /dev/null
+++ b/library/std/src/sys_common/thread_parker/mod.rs
@@ -0,0 +1,22 @@
+cfg_if::cfg_if! {
+ if #[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ all(target_arch = "wasm32", target_feature = "atomics"),
+ target_os = "freebsd",
+ target_os = "openbsd",
+ target_os = "dragonfly",
+ target_os = "fuchsia",
+ ))] {
+ mod futex;
+ pub use futex::Parker;
+ } else if #[cfg(target_os = "solid_asp3")] {
+ mod wait_flag;
+ pub use wait_flag::Parker;
+ } else if #[cfg(any(windows, target_family = "unix"))] {
+ pub use crate::sys::thread_parker::Parker;
+ } else {
+ mod generic;
+ pub use generic::Parker;
+ }
+}
diff --git a/library/std/src/sys_common/thread_parker/wait_flag.rs b/library/std/src/sys_common/thread_parker/wait_flag.rs
new file mode 100644
index 000000000..6561c1866
--- /dev/null
+++ b/library/std/src/sys_common/thread_parker/wait_flag.rs
@@ -0,0 +1,102 @@
+//! A wait-flag-based thread parker.
+//!
+//! Some operating systems provide low-level parking primitives like wait counts,
+//! event flags or semaphores which are not susceptible to race conditions (meaning
+//! the wakeup can occur before the wait operation). To implement the `std` thread
+//! parker on top of these primitives, we only have to ensure that parking is fast
+//! when the thread token is available, the atomic ordering guarantees are maintained
+//! and spurious wakeups are minimized.
+//!
+//! To achieve this, this parker uses an atomic variable with three states: `EMPTY`,
+//! `PARKED` and `NOTIFIED`:
+//! * `EMPTY` means the token has not been made available, but the thread is not
+//! currently waiting on it.
+//! * `PARKED` means the token is not available and the thread is parked.
+//! * `NOTIFIED` means the token is available.
+//!
+//! `park` and `park_timeout` change the state from `EMPTY` to `PARKED` and from
+//! `NOTIFIED` to `EMPTY`. If the state was `NOTIFIED`, the thread was unparked and
+//! execution can continue without calling into the OS. If the state was `EMPTY`,
+//! the token is not available and the thread waits on the primitive (here called
+//! "wait flag").
+//!
+//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread
+//! is or will be sleeping on the wait flag, so we raise it.
+
+use crate::pin::Pin;
+use crate::sync::atomic::AtomicI8;
+use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+use crate::sys::wait_flag::WaitFlag;
+use crate::time::Duration;
+
+const EMPTY: i8 = 0;
+const PARKED: i8 = -1;
+const NOTIFIED: i8 = 1;
+
+pub struct Parker {
+ state: AtomicI8,
+ wait_flag: WaitFlag,
+}
+
+impl Parker {
+ /// Construct a parker for the current thread. The UNIX parker
+ /// implementation requires this to happen in-place.
+ pub unsafe fn new(parker: *mut Parker) {
+ parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() })
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park(self: Pin<&Self>) {
+ match self.state.fetch_sub(1, Acquire) {
+ // NOTIFIED => EMPTY
+ NOTIFIED => return,
+ // EMPTY => PARKED
+ EMPTY => (),
+ _ => panic!("inconsistent park state"),
+ }
+
+ // Avoid waking up from spurious wakeups (these are quite likely, see below).
+ loop {
+ self.wait_flag.wait();
+
+ match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) {
+ Ok(_) => return,
+ Err(PARKED) => (),
+ Err(_) => panic!("inconsistent park state"),
+ }
+ }
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+ match self.state.fetch_sub(1, Acquire) {
+ NOTIFIED => return,
+ EMPTY => (),
+ _ => panic!("inconsistent park state"),
+ }
+
+ self.wait_flag.wait_timeout(dur);
+
+ // Either a wakeup or a timeout occurred. Wakeups may be spurious, as there can be
+ // a race condition when `unpark` is performed between receiving the timeout and
+ // resetting the state, resulting in the eventflag being set unnecessarily. `park`
+ // is protected against this by looping until the token is actually given, but
+ // here we cannot easily tell.
+
+ // Use `swap` to provide acquire ordering.
+ match self.state.swap(EMPTY, Acquire) {
+ NOTIFIED => (),
+ PARKED => (),
+ _ => panic!("inconsistent park state"),
+ }
+ }
+
+ // This implementation doesn't require `Pin`, but other implementations do.
+ pub fn unpark(self: Pin<&Self>) {
+ let state = self.state.swap(NOTIFIED, Release);
+
+ if state == PARKED {
+ self.wait_flag.raise();
+ }
+ }
+}