summaryrefslogtreecommitdiffstats
path: root/library/std/src/sys_common/thread_parking
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:19:13 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:19:13 +0000
commit218caa410aa38c29984be31a5229b9fa717560ee (patch)
treec54bd55eeb6e4c508940a30e94c0032fbd45d677 /library/std/src/sys_common/thread_parking
parentReleasing progress-linux version 1.67.1+dfsg1-1~progress7.99u1. (diff)
downloadrustc-218caa410aa38c29984be31a5229b9fa717560ee.tar.xz
rustc-218caa410aa38c29984be31a5229b9fa717560ee.zip
Merging upstream version 1.68.2+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sys_common/thread_parking')
-rw-r--r--library/std/src/sys_common/thread_parking/futex.rs97
-rw-r--r--library/std/src/sys_common/thread_parking/generic.rs125
-rw-r--r--library/std/src/sys_common/thread_parking/id.rs108
-rw-r--r--library/std/src/sys_common/thread_parking/mod.rs29
-rw-r--r--library/std/src/sys_common/thread_parking/wait_flag.rs102
5 files changed, 461 insertions, 0 deletions
diff --git a/library/std/src/sys_common/thread_parking/futex.rs b/library/std/src/sys_common/thread_parking/futex.rs
new file mode 100644
index 000000000..588e7b278
--- /dev/null
+++ b/library/std/src/sys_common/thread_parking/futex.rs
@@ -0,0 +1,97 @@
+use crate::pin::Pin;
+use crate::sync::atomic::AtomicU32;
+use crate::sync::atomic::Ordering::{Acquire, Release};
+use crate::sys::futex::{futex_wait, futex_wake};
+use crate::time::Duration;
+
+const PARKED: u32 = u32::MAX;
+const EMPTY: u32 = 0;
+const NOTIFIED: u32 = 1;
+
+pub struct Parker {
+ state: AtomicU32,
+}
+
+// Notes about memory ordering:
+//
+// Memory ordering is only relevant for the relative ordering of operations
+// between different variables. Even Ordering::Relaxed guarantees a
+// monotonic/consistent order when looking at just a single atomic variable.
+//
+// So, since this parker is just a single atomic variable, we only need to look
+// at the ordering guarantees we need to provide to the 'outside world'.
+//
+// The only memory ordering guarantee that parking and unparking provide, is
+// that things which happened before unpark() are visible on the thread
+// returning from park() afterwards. Otherwise, it was effectively unparked
+// before unpark() was called while still consuming the 'token'.
+//
+// In other words, unpark() needs to synchronize with the part of park() that
+// consumes the token and returns.
+//
+// This is done with a release-acquire synchronization, by using
+// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
+// Ordering::Acquire when checking for this state in park().
+impl Parker {
+ /// Construct the futex parker. The UNIX parker implementation
+ /// requires this to happen in-place.
+ pub unsafe fn new_in_place(parker: *mut Parker) {
+ parker.write(Self { state: AtomicU32::new(EMPTY) });
+ }
+
+ // Assumes this is only called by the thread that owns the Parker,
+ // which means that `self.state != PARKED`.
+ pub unsafe fn park(self: Pin<&Self>) {
+ // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+ // first case.
+ if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+ return;
+ }
+ loop {
+ // Wait for something to happen, assuming it's still set to PARKED.
+ futex_wait(&self.state, PARKED, None);
+ // Change NOTIFIED=>EMPTY and return in that case.
+ if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
+ return;
+ } else {
+ // Spurious wake up. We loop to try again.
+ }
+ }
+ }
+
+ // Assumes this is only called by the thread that owns the Parker,
+ // which means that `self.state != PARKED`. This implementation doesn't
+ // require `Pin`, but other implementations do.
+ pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
+ // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
+ // first case.
+ if self.state.fetch_sub(1, Acquire) == NOTIFIED {
+ return;
+ }
+ // Wait for something to happen, assuming it's still set to PARKED.
+ futex_wait(&self.state, PARKED, Some(timeout));
+ // This is not just a store, because we need to establish a
+ // release-acquire ordering with unpark().
+ if self.state.swap(EMPTY, Acquire) == NOTIFIED {
+ // Woke up because of unpark().
+ } else {
+ // Timeout or spurious wake up.
+ // We return either way, because we can't easily tell if it was the
+ // timeout or not.
+ }
+ }
+
+ // This implementation doesn't require `Pin`, but other implementations do.
+ #[inline]
+ pub fn unpark(self: Pin<&Self>) {
+ // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
+ // wake the thread in the first case.
+ //
+ // Note that even NOTIFIED=>NOTIFIED results in a write. This is on
+ // purpose, to make sure every unpark() has a release-acquire ordering
+ // with park().
+ if self.state.swap(NOTIFIED, Release) == PARKED {
+ futex_wake(&self.state);
+ }
+ }
+}
diff --git a/library/std/src/sys_common/thread_parking/generic.rs b/library/std/src/sys_common/thread_parking/generic.rs
new file mode 100644
index 000000000..3209bffe3
--- /dev/null
+++ b/library/std/src/sys_common/thread_parking/generic.rs
@@ -0,0 +1,125 @@
+//! Parker implementation based on a Mutex and Condvar.
+
+use crate::pin::Pin;
+use crate::sync::atomic::AtomicUsize;
+use crate::sync::atomic::Ordering::SeqCst;
+use crate::sync::{Condvar, Mutex};
+use crate::time::Duration;
+
+const EMPTY: usize = 0;
+const PARKED: usize = 1;
+const NOTIFIED: usize = 2;
+
+pub struct Parker {
+ state: AtomicUsize,
+ lock: Mutex<()>,
+ cvar: Condvar,
+}
+
+impl Parker {
+ /// Construct the generic parker. The UNIX parker implementation
+ /// requires this to happen in-place.
+ pub unsafe fn new_in_place(parker: *mut Parker) {
+ parker.write(Parker {
+ state: AtomicUsize::new(EMPTY),
+ lock: Mutex::new(()),
+ cvar: Condvar::new(),
+ });
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park(self: Pin<&Self>) {
+ // If we were previously notified then we consume this notification and
+ // return quickly.
+ if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+ return;
+ }
+
+ // Otherwise we need to coordinate going to sleep
+ let mut m = self.lock.lock().unwrap();
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read here, even though we know it will be `NOTIFIED`.
+ // This is because `unpark` may have been called again since we read
+ // `NOTIFIED` in the `compare_exchange` above. We must perform an
+ // acquire operation that synchronizes with that `unpark` to observe
+ // any writes it made before the call to unpark. To do that we must
+ // read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ } // should consume this notification, so prohibit spurious wakeups in next park.
+ Err(_) => panic!("inconsistent park state"),
+ }
+ loop {
+ m = self.cvar.wait(m).unwrap();
+ match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
+ Ok(_) => return, // got a notification
+ Err(_) => {} // spurious wakeup, go back to sleep
+ }
+ }
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+ // Like `park` above we have a fast path for an already-notified thread, and
+ // afterwards we start coordinating for a sleep.
+ // return quickly.
+ if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
+ return;
+ }
+ let m = self.lock.lock().unwrap();
+ match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read again here, see `park`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+ return;
+ } // should consume this notification, so prohibit spurious wakeups in next park.
+ Err(_) => panic!("inconsistent park_timeout state"),
+ }
+
+ // Wait with a timeout, and if we spuriously wake up or otherwise wake up
+ // from a notification we just want to unconditionally set the state back to
+ // empty, either consuming a notification or un-flagging ourselves as
+ // parked.
+ let (_m, _result) = self.cvar.wait_timeout(m, dur).unwrap();
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED => {} // got a notification, hurray!
+ PARKED => {} // no notification, alas
+ n => panic!("inconsistent park_timeout state: {n}"),
+ }
+ }
+
+ // This implementation doesn't require `Pin`, but other implementations do.
+ pub fn unpark(self: Pin<&Self>) {
+ // To ensure the unparked thread will observe any writes we made
+ // before this call, we must perform a release operation that `park`
+ // can synchronize with. To do that we must write `NOTIFIED` even if
+ // `state` is already `NOTIFIED`. That is why this must be a swap
+ // rather than a compare-and-swap that returns if it reads `NOTIFIED`
+ // on failure.
+ match self.state.swap(NOTIFIED, SeqCst) {
+ EMPTY => return, // no one was waiting
+ NOTIFIED => return, // already unparked
+ PARKED => {} // gotta go wake someone up
+ _ => panic!("inconsistent state in unpark"),
+ }
+
+ // There is a period between when the parked thread sets `state` to
+ // `PARKED` (or last checked `state` in the case of a spurious wake
+ // up) and when it actually waits on `cvar`. If we were to notify
+ // during this period it would be ignored and then when the parked
+ // thread went to sleep it would never wake up. Fortunately, it has
+ // `lock` locked at this stage so we can acquire `lock` to wait until
+ // it is ready to receive the notification.
+ //
+ // Releasing `lock` before the call to `notify_one` means that when the
+ // parked thread wakes it doesn't get woken only to have to wait for us
+ // to release `lock`.
+ drop(self.lock.lock().unwrap());
+ self.cvar.notify_one()
+ }
+}
diff --git a/library/std/src/sys_common/thread_parking/id.rs b/library/std/src/sys_common/thread_parking/id.rs
new file mode 100644
index 000000000..e98169597
--- /dev/null
+++ b/library/std/src/sys_common/thread_parking/id.rs
@@ -0,0 +1,108 @@
+//! Thread parking using thread ids.
+//!
+//! Some platforms (notably NetBSD) have thread parking primitives whose semantics
+//! match those offered by `thread::park`, with the difference that the thread to
+//! be unparked is referenced by a platform-specific thread id. Since the thread
+//! parker is constructed before that id is known, an atomic state variable is used
+//! to manage the park state and propagate the thread id. This also avoids platform
+//! calls in the case where `unpark` is called before `park`.
+
+use crate::cell::UnsafeCell;
+use crate::pin::Pin;
+use crate::sync::atomic::{
+ fence, AtomicI8,
+ Ordering::{Acquire, Relaxed, Release},
+};
+use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId};
+use crate::time::Duration;
+
+pub struct Parker {
+ state: AtomicI8,
+ tid: UnsafeCell<Option<ThreadId>>,
+}
+
+const PARKED: i8 = -1;
+const EMPTY: i8 = 0;
+const NOTIFIED: i8 = 1;
+
+impl Parker {
+ pub fn new() -> Parker {
+ Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) }
+ }
+
+ /// Create a new thread parker. UNIX requires this to happen in-place.
+ pub unsafe fn new_in_place(parker: *mut Parker) {
+ parker.write(Parker::new())
+ }
+
+ /// # Safety
+ /// * must always be called from the same thread
+ /// * must be called before the state is set to PARKED
+ unsafe fn init_tid(&self) {
+ // The field is only ever written to from this thread, so we don't need
+ // synchronization to read it here.
+ if self.tid.get().read().is_none() {
+ // Because this point is only reached once, before the state is set
+ // to PARKED for the first time, the non-atomic write here can not
+ // conflict with reads by other threads.
+ self.tid.get().write(Some(current()));
+ // Ensure that the write can be observed by all threads reading the
+ // state. Synchronizes with the acquire barrier in `unpark`.
+ fence(Release);
+ }
+ }
+
+ pub unsafe fn park(self: Pin<&Self>) {
+ self.init_tid();
+
+ // Changes NOTIFIED to EMPTY and EMPTY to PARKED.
+ let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
+ if state == PARKED {
+ // Loop to guard against spurious wakeups.
+ while state == PARKED {
+ park(self.state.as_mut_ptr().addr());
+ state = self.state.load(Acquire);
+ }
+
+ // Since the state change has already been observed with acquire
+ // ordering, the state can be reset with a relaxed store instead
+ // of a swap.
+ self.state.store(EMPTY, Relaxed);
+ }
+ }
+
+ pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+ self.init_tid();
+
+ let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
+ if state == PARKED {
+ park_timeout(dur, self.state.as_mut_ptr().addr());
+ // Swap to ensure that we observe all state changes with acquire
+ // ordering, even if the state has been changed after the timeout
+ // occured.
+ self.state.swap(EMPTY, Acquire);
+ }
+ }
+
+ pub fn unpark(self: Pin<&Self>) {
+ let state = self.state.swap(NOTIFIED, Release);
+ if state == PARKED {
+ // Synchronize with the release fence in `init_tid` to observe the
+ // write to `tid`.
+ fence(Acquire);
+ // # Safety
+ // The thread id is initialized before the state is set to `PARKED`
+ // for the first time and is not written to from that point on
+ // (negating the need for an atomic read).
+ let tid = unsafe { self.tid.get().read().unwrap_unchecked() };
+ // It is possible that the waiting thread woke up because of a timeout
+ // and terminated before this call is made. This call then returns an
+ // error or wakes up an unrelated thread. The platform API and
+ // environment does allow this, however.
+ unpark(tid, self.state.as_mut_ptr().addr());
+ }
+ }
+}
+
+unsafe impl Send for Parker {}
+unsafe impl Sync for Parker {}
diff --git a/library/std/src/sys_common/thread_parking/mod.rs b/library/std/src/sys_common/thread_parking/mod.rs
new file mode 100644
index 000000000..0ead6633c
--- /dev/null
+++ b/library/std/src/sys_common/thread_parking/mod.rs
@@ -0,0 +1,29 @@
+cfg_if::cfg_if! {
+ if #[cfg(any(
+ target_os = "linux",
+ target_os = "android",
+ all(target_arch = "wasm32", target_feature = "atomics"),
+ target_os = "freebsd",
+ target_os = "openbsd",
+ target_os = "dragonfly",
+ target_os = "fuchsia",
+ target_os = "hermit",
+ ))] {
+ mod futex;
+ pub use futex::Parker;
+ } else if #[cfg(any(
+ target_os = "netbsd",
+ all(target_vendor = "fortanix", target_env = "sgx"),
+ ))] {
+ mod id;
+ pub use id::Parker;
+ } else if #[cfg(target_os = "solid_asp3")] {
+ mod wait_flag;
+ pub use wait_flag::Parker;
+ } else if #[cfg(any(windows, target_family = "unix"))] {
+ pub use crate::sys::thread_parking::Parker;
+ } else {
+ mod generic;
+ pub use generic::Parker;
+ }
+}
diff --git a/library/std/src/sys_common/thread_parking/wait_flag.rs b/library/std/src/sys_common/thread_parking/wait_flag.rs
new file mode 100644
index 000000000..d0f8899a9
--- /dev/null
+++ b/library/std/src/sys_common/thread_parking/wait_flag.rs
@@ -0,0 +1,102 @@
+//! A wait-flag-based thread parker.
+//!
+//! Some operating systems provide low-level parking primitives like wait counts,
+//! event flags or semaphores which are not susceptible to race conditions (meaning
+//! the wakeup can occur before the wait operation). To implement the `std` thread
+//! parker on top of these primitives, we only have to ensure that parking is fast
+//! when the thread token is available, the atomic ordering guarantees are maintained
+//! and spurious wakeups are minimized.
+//!
+//! To achieve this, this parker uses an atomic variable with three states: `EMPTY`,
+//! `PARKED` and `NOTIFIED`:
+//! * `EMPTY` means the token has not been made available, but the thread is not
+//! currently waiting on it.
+//! * `PARKED` means the token is not available and the thread is parked.
+//! * `NOTIFIED` means the token is available.
+//!
+//! `park` and `park_timeout` change the state from `EMPTY` to `PARKED` and from
+//! `NOTIFIED` to `EMPTY`. If the state was `NOTIFIED`, the thread was unparked and
+//! execution can continue without calling into the OS. If the state was `EMPTY`,
+//! the token is not available and the thread waits on the primitive (here called
+//! "wait flag").
+//!
+//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread
+//! is or will be sleeping on the wait flag, so we raise it.
+
+use crate::pin::Pin;
+use crate::sync::atomic::AtomicI8;
+use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+use crate::sys::wait_flag::WaitFlag;
+use crate::time::Duration;
+
+const EMPTY: i8 = 0;
+const PARKED: i8 = -1;
+const NOTIFIED: i8 = 1;
+
+pub struct Parker {
+ state: AtomicI8,
+ wait_flag: WaitFlag,
+}
+
+impl Parker {
+ /// Construct a parker for the current thread. The UNIX parker
+ /// implementation requires this to happen in-place.
+ pub unsafe fn new_in_place(parker: *mut Parker) {
+ parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() })
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park(self: Pin<&Self>) {
+ match self.state.fetch_sub(1, Acquire) {
+ // NOTIFIED => EMPTY
+ NOTIFIED => return,
+ // EMPTY => PARKED
+ EMPTY => (),
+ _ => panic!("inconsistent park state"),
+ }
+
+ // Avoid waking up from spurious wakeups (these are quite likely, see below).
+ loop {
+ self.wait_flag.wait();
+
+ match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) {
+ Ok(_) => return,
+ Err(PARKED) => (),
+ Err(_) => panic!("inconsistent park state"),
+ }
+ }
+ }
+
+ // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+ pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+ match self.state.fetch_sub(1, Acquire) {
+ NOTIFIED => return,
+ EMPTY => (),
+ _ => panic!("inconsistent park state"),
+ }
+
+ self.wait_flag.wait_timeout(dur);
+
+ // Either a wakeup or a timeout occurred. Wakeups may be spurious, as there can be
+ // a race condition when `unpark` is performed between receiving the timeout and
+ // resetting the state, resulting in the eventflag being set unnecessarily. `park`
+ // is protected against this by looping until the token is actually given, but
+ // here we cannot easily tell.
+
+ // Use `swap` to provide acquire ordering.
+ match self.state.swap(EMPTY, Acquire) {
+ NOTIFIED => (),
+ PARKED => (),
+ _ => panic!("inconsistent park state"),
+ }
+ }
+
+ // This implementation doesn't require `Pin`, but other implementations do.
+ pub fn unpark(self: Pin<&Self>) {
+ let state = self.state.swap(NOTIFIED, Release);
+
+ if state == PARKED {
+ self.wait_flag.raise();
+ }
+ }
+}