//! Thread parking for Darwin-based systems. //! //! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they //! cannot be used in `std` because they are non-public (their use will lead to //! rejection from the App Store) and because they are only available starting //! with macOS version 10.12, even though the minimum target version is 10.7. //! //! Therefore, we need to look for other synchronization primitives. Luckily, Darwin //! supports semaphores, which allow us to implement the behaviour we need with //! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore //! provided by libdispatch, as the underlying Mach semaphore is only dubiously //! public. use crate::pin::Pin; use crate::sync::atomic::{ AtomicI8, Ordering::{Acquire, Release}, }; use crate::time::Duration; type dispatch_semaphore_t = *mut crate::ffi::c_void; type dispatch_time_t = u64; const DISPATCH_TIME_NOW: dispatch_time_t = 0; const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; // Contained in libSystem.dylib, which is linked by default. extern "C" { fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t; fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t; fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize; fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize; fn dispatch_release(object: *mut crate::ffi::c_void); } const EMPTY: i8 = 0; const NOTIFIED: i8 = 1; const PARKED: i8 = -1; pub struct Parker { semaphore: dispatch_semaphore_t, state: AtomicI8, } unsafe impl Sync for Parker {} unsafe impl Send for Parker {} impl Parker { pub unsafe fn new_in_place(parker: *mut Parker) { let semaphore = dispatch_semaphore_create(0); assert!( !semaphore.is_null(), "failed to create dispatch semaphore for thread synchronization" ); parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) }) } // Does not need `Pin`, but other implementation do. pub unsafe fn park(self: Pin<&Self>) { // The semaphore counter must be zero at this point, because unparking // threads will not actually increase it until we signalled that we // are waiting. // Change NOTIFIED to EMPTY and EMPTY to PARKED. if self.state.fetch_sub(1, Acquire) == NOTIFIED { return; } // Another thread may increase the semaphore counter from this point on. // If it is faster than us, we will decrement it again immediately below. // If we are faster, we wait. // Ensure that the semaphore counter has actually been decremented, even // if the call timed out for some reason. while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} // At this point, the semaphore counter is zero again. // We were definitely woken up, so we don't need to check the state. // Still, we need to reset the state using a swap to observe the state // change with acquire ordering. self.state.swap(EMPTY, Acquire); } // Does not need `Pin`, but other implementation do. pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { if self.state.fetch_sub(1, Acquire) == NOTIFIED { return; } let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX); let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos); let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0; let state = self.state.swap(EMPTY, Acquire); if state == NOTIFIED && timeout { // If the state was NOTIFIED but semaphore_wait returned without // decrementing the count because of a timeout, it means another // thread is about to call semaphore_signal. We must wait for that // to happen to ensure the semaphore count is reset. while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} } else { // Either a timeout occurred and we reset the state before any thread // tried to wake us up, or we were woken up and reset the state, // making sure to observe the state change with acquire ordering. // Either way, the semaphore counter is now zero again. } } // Does not need `Pin`, but other implementation do. pub fn unpark(self: Pin<&Self>) { let state = self.state.swap(NOTIFIED, Release); if state == PARKED { unsafe { dispatch_semaphore_signal(self.semaphore); } } } } impl Drop for Parker { fn drop(&mut self) { // SAFETY: // We always ensure that the semaphore count is reset, so this will // never cause an exception. unsafe { dispatch_release(self.semaphore); } } }