//! Thread parking using thread ids. //! //! Some platforms (notably NetBSD) have thread parking primitives whose semantics //! match those offered by `thread::park`, with the difference that the thread to //! be unparked is referenced by a platform-specific thread id. Since the thread //! parker is constructed before that id is known, an atomic state variable is used //! to manage the park state and propagate the thread id. This also avoids platform //! calls in the case where `unpark` is called before `park`. use crate::cell::UnsafeCell; use crate::pin::Pin; use crate::sync::atomic::{ fence, AtomicI8, Ordering::{Acquire, Relaxed, Release}, }; use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId}; use crate::time::Duration; pub struct Parker { state: AtomicI8, tid: UnsafeCell>, } const PARKED: i8 = -1; const EMPTY: i8 = 0; const NOTIFIED: i8 = 1; impl Parker { pub fn new() -> Parker { Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) } } /// Create a new thread parker. UNIX requires this to happen in-place. pub unsafe fn new_in_place(parker: *mut Parker) { parker.write(Parker::new()) } /// # Safety /// * must always be called from the same thread /// * must be called before the state is set to PARKED unsafe fn init_tid(&self) { // The field is only ever written to from this thread, so we don't need // synchronization to read it here. if self.tid.get().read().is_none() { // Because this point is only reached once, before the state is set // to PARKED for the first time, the non-atomic write here can not // conflict with reads by other threads. self.tid.get().write(Some(current())); // Ensure that the write can be observed by all threads reading the // state. Synchronizes with the acquire barrier in `unpark`. fence(Release); } } pub unsafe fn park(self: Pin<&Self>) { self.init_tid(); // Changes NOTIFIED to EMPTY and EMPTY to PARKED. let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1); if state == PARKED { // Loop to guard against spurious wakeups. while state == PARKED { park(self.state.as_mut_ptr().addr()); state = self.state.load(Acquire); } // Since the state change has already been observed with acquire // ordering, the state can be reset with a relaxed store instead // of a swap. self.state.store(EMPTY, Relaxed); } } pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { self.init_tid(); let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1); if state == PARKED { park_timeout(dur, self.state.as_mut_ptr().addr()); // Swap to ensure that we observe all state changes with acquire // ordering, even if the state has been changed after the timeout // occured. self.state.swap(EMPTY, Acquire); } } pub fn unpark(self: Pin<&Self>) { let state = self.state.swap(NOTIFIED, Release); if state == PARKED { // Synchronize with the release fence in `init_tid` to observe the // write to `tid`. fence(Acquire); // # Safety // The thread id is initialized before the state is set to `PARKED` // for the first time and is not written to from that point on // (negating the need for an atomic read). let tid = unsafe { self.tid.get().read().unwrap_unchecked() }; // It is possible that the waiting thread woke up because of a timeout // and terminated before this call is made. This call then returns an // error or wakes up an unrelated thread. The platform API and // environment does allow this, however. unpark(tid, self.state.as_mut_ptr().addr()); } } } unsafe impl Send for Parker {} unsafe impl Sync for Parker {}