//! Futures-powered synchronization primitives. use alloc::boxed::Box; use alloc::sync::Arc; use core::cell::UnsafeCell; use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering::SeqCst; #[cfg(feature = "bilock")] use futures_core::future::Future; use futures_core::task::{Context, Poll, Waker}; /// A type of futures-powered synchronization primitive which is a mutex between /// two possible owners. /// /// This primitive is not as generic as a full-blown mutex but is sufficient for /// many use cases where there are only two possible owners of a resource. The /// implementation of `BiLock` can be more optimized for just the two possible /// owners. /// /// Note that it's possible to use this lock through a poll-style interface with /// the `poll_lock` method but you can also use it as a future with the `lock` /// method that consumes a `BiLock` and returns a future that will resolve when /// it's locked. /// /// A `BiLock` is typically used for "split" operations where data which serves /// two purposes wants to be split into two to be worked with separately. For /// example a TCP stream could be both a reader and a writer or a framing layer /// could be both a stream and a sink for messages. A `BiLock` enables splitting /// these two and then using each independently in a futures-powered fashion. /// /// This type is only available when the `bilock` feature of this /// library is activated. #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct BiLock { arc: Arc>, } #[derive(Debug)] struct Inner { state: AtomicUsize, value: Option>, } unsafe impl Send for Inner {} unsafe impl Sync for Inner {} impl BiLock { /// Creates a new `BiLock` protecting the provided data. /// /// Two handles to the lock are returned, and these are the only two handles /// that will ever be available to the lock. These can then be sent to separate /// tasks to be managed there. /// /// The data behind the bilock is considered to be pinned, which allows `Pin` /// references to locked data. However, this means that the locked value /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`. /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); (Self { arc: arc.clone() }, Self { arc }) } /// Attempt to acquire this lock, returning `Pending` if it can't be /// acquired. /// /// This function will acquire the lock in a nonblocking fashion, returning /// immediately if the lock is already held. If the lock is successfully /// acquired then `Poll::Ready` is returned with a value that represents /// the locked value (and can be used to access the protected data). The /// lock is unlocked when the returned `BiLockGuard` is dropped. /// /// If the lock is already held then this function will return /// `Poll::Pending`. In this case the current task will also be scheduled /// to receive a notification when the lock would otherwise become /// available. /// /// # Panics /// /// This function will panic if called outside the context of a future's /// task. pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { let mut waker = None; loop { match self.arc.state.swap(1, SeqCst) { // Woohoo, we grabbed the lock! 0 => return Poll::Ready(BiLockGuard { bilock: self }), // Oops, someone else has locked the lock 1 => {} // A task was previously blocked on this lock, likely our task, // so we need to update that task. n => unsafe { let mut prev = Box::from_raw(n as *mut Waker); *prev = cx.waker().clone(); waker = Some(prev); }, } // type ascription for safety's sake! let me: Box = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); let me = Box::into_raw(me) as usize; match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { // The lock is still locked, but we've now parked ourselves, so // just report that we're scheduled to receive a notification. Ok(_) => return Poll::Pending, // Oops, looks like the lock was unlocked after our swap above // and before the compare_exchange. Deallocate what we just // allocated and go through the loop again. Err(0) => unsafe { waker = Some(Box::from_raw(me as *mut Waker)); }, // The top of this loop set the previous state to 1, so if we // failed the CAS above then it's because the previous value was // *not* zero or one. This indicates that a task was blocked, // but we're trying to acquire the lock and there's only one // other reference of the lock, so it should be impossible for // that task to ever block itself. Err(n) => panic!("invalid state: {}", n), } } } /// Perform a "blocking lock" of this lock, consuming this lock handle and /// returning a future to the acquired lock. /// /// This function consumes the `BiLock` and returns a sentinel future, /// `BiLockAcquire`. The returned future will resolve to /// `BiLockAcquired` which represents a locked lock similarly to /// `BiLockGuard`. /// /// Note that the returned future will never resolve to an error. #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub fn lock(&self) -> BiLockAcquire<'_, T> { BiLockAcquire { bilock: self } } /// Attempts to put the two "halves" of a `BiLock` back together and /// recover the original value. Succeeds only if the two `BiLock`s /// originated from the same call to `BiLock::new`. pub fn reunite(self, other: Self) -> Result> where T: Unpin, { if Arc::ptr_eq(&self.arc, &other.arc) { drop(other); let inner = Arc::try_unwrap(self.arc) .ok() .expect("futures: try_unwrap failed in BiLock::reunite"); Ok(unsafe { inner.into_value() }) } else { Err(ReuniteError(self, other)) } } fn unlock(&self) { match self.arc.state.swap(0, SeqCst) { // we've locked the lock, shouldn't be possible for us to see an // unlocked lock. 0 => panic!("invalid unlocked state"), // Ok, no one else tried to get the lock, we're done. 1 => {} // Another task has parked themselves on this lock, let's wake them // up as its now their turn. n => unsafe { Box::from_raw(n as *mut Waker).wake(); }, } } } impl Inner { unsafe fn into_value(mut self) -> T { self.value.take().unwrap().into_inner() } } impl Drop for Inner { fn drop(&mut self) { assert_eq!(self.state.load(SeqCst), 0); } } /// Error indicating two `BiLock`s were not two halves of a whole, and /// thus could not be `reunite`d. #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct ReuniteError(pub BiLock, pub BiLock); impl fmt::Debug for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("ReuniteError").field(&"...").finish() } } impl fmt::Display for ReuniteError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "tried to reunite two BiLocks that don't form a pair") } } #[cfg(feature = "std")] impl std::error::Error for ReuniteError {} /// Returned RAII guard from the `poll_lock` method. /// /// This structure acts as a sentinel to the data in the `BiLock` itself, /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be /// unlocked. #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub struct BiLockGuard<'a, T> { bilock: &'a BiLock, } // We allow parallel access to T via Deref, so Sync bound is also needed here. unsafe impl Sync for BiLockGuard<'_, T> {} impl Deref for BiLockGuard<'_, T> { type Target = T; fn deref(&self) -> &T { unsafe { &*self.bilock.arc.value.as_ref().unwrap().get() } } } impl DerefMut for BiLockGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.bilock.arc.value.as_ref().unwrap().get() } } } impl BiLockGuard<'_, T> { /// Get a mutable pinned reference to the locked value. pub fn as_pin_mut(&mut self) -> Pin<&mut T> { // Safety: we never allow moving a !Unpin value out of a bilock, nor // allow mutable access to it unsafe { Pin::new_unchecked(&mut *self.bilock.arc.value.as_ref().unwrap().get()) } } } impl Drop for BiLockGuard<'_, T> { fn drop(&mut self) { self.bilock.unlock(); } } /// Future returned by `BiLock::lock` which will resolve when the lock is /// acquired. #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] #[must_use = "futures do nothing unless you `.await` or poll them"] #[derive(Debug)] pub struct BiLockAcquire<'a, T> { bilock: &'a BiLock, } // Pinning is never projected to fields #[cfg(feature = "bilock")] impl Unpin for BiLockAcquire<'_, T> {} #[cfg(feature = "bilock")] impl<'a, T> Future for BiLockAcquire<'a, T> { type Output = BiLockGuard<'a, T>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.bilock.poll_lock(cx) } }