//! Definition of the Shared combinator, a future that is cloneable, //! and can be polled in multiple threads. //! //! # Examples //! //! ``` //! use futures::future::*; //! //! let future = ok::<_, bool>(6); //! let shared1 = future.shared(); //! let shared2 = shared1.clone(); //! assert_eq!(6, *shared1.wait().unwrap()); //! assert_eq!(6, *shared2.wait().unwrap()); //! ``` use {Future, Poll, Async}; use task::{self, Task}; use executor::{self, Notify, Spawn}; use std::{error, fmt, mem, ops}; use std::cell::UnsafeCell; use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::collections::HashMap; /// A future that is cloneable and can be polled in multiple threads. /// Use `Future::shared()` method to convert any future into a `Shared` future. #[must_use = "futures do nothing unless polled"] pub struct Shared { inner: Arc>, waiter: usize, } impl fmt::Debug for Shared where F: Future + fmt::Debug, F::Item: fmt::Debug, F::Error: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Shared") .field("inner", &self.inner) .field("waiter", &self.waiter) .finish() } } struct Inner { next_clone_id: AtomicUsize, future: UnsafeCell>>, result: UnsafeCell, SharedError>>>, notifier: Arc, } struct Notifier { state: AtomicUsize, waiters: Mutex>, } const IDLE: usize = 0; const POLLING: usize = 1; const COMPLETE: usize = 2; const POISONED: usize = 3; pub fn new(future: F) -> Shared { Shared { inner: Arc::new(Inner { next_clone_id: AtomicUsize::new(1), notifier: Arc::new(Notifier { state: AtomicUsize::new(IDLE), waiters: Mutex::new(HashMap::new()), }), future: UnsafeCell::new(Some(executor::spawn(future))), result: UnsafeCell::new(None), }), waiter: 0, } } impl Shared where F: Future { // TODO: make this private #[deprecated(since = "0.1.12", note = "use `Future::shared` instead")] #[cfg(feature = "with-deprecated")] #[doc(hidden)] pub fn new(future: F) -> Self { new(future) } /// If any clone of this `Shared` has completed execution, returns its result immediately /// without blocking. Otherwise, returns None without triggering the work represented by /// this `Shared`. pub fn peek(&self) -> Option, SharedError>> { match self.inner.notifier.state.load(SeqCst) { COMPLETE => { Some(unsafe { self.clone_result() }) } POISONED => panic!("inner future panicked during poll"), _ => None, } } fn set_waiter(&mut self) { let mut waiters = self.inner.notifier.waiters.lock().unwrap(); waiters.insert(self.waiter, task::current()); } unsafe fn clone_result(&self) -> Result, SharedError> { match *self.inner.result.get() { Some(Ok(ref item)) => Ok(SharedItem { item: item.item.clone() }), Some(Err(ref e)) => Err(SharedError { error: e.error.clone() }), _ => unreachable!(), } } fn complete(&self) { unsafe { *self.inner.future.get() = None }; self.inner.notifier.state.store(COMPLETE, SeqCst); self.inner.notifier.notify(0); } } impl Future for Shared where F: Future { type Item = SharedItem; type Error = SharedError; fn poll(&mut self) -> Poll { self.set_waiter(); match self.inner.notifier.state.compare_and_swap(IDLE, POLLING, SeqCst) { IDLE => { // Lock acquired, fall through } POLLING => { // Another task is currently polling, at this point we just want // to ensure that our task handle is currently registered return Ok(Async::NotReady); } COMPLETE => { return unsafe { self.clone_result().map(Async::Ready) }; } POISONED => panic!("inner future panicked during poll"), _ => unreachable!(), } struct Reset<'a>(&'a AtomicUsize); impl<'a> Drop for Reset<'a> { fn drop(&mut self) { use std::thread; if thread::panicking() { self.0.store(POISONED, SeqCst); } } } let _reset = Reset(&self.inner.notifier.state); // Poll the future let res = unsafe { (*self.inner.future.get()).as_mut().unwrap() .poll_future_notify(&self.inner.notifier, 0) }; match res { Ok(Async::NotReady) => { // Not ready, try to release the handle match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) { POLLING => { // Success return Ok(Async::NotReady); } _ => unreachable!(), } } Ok(Async::Ready(i)) => { unsafe { (*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) })); } } Err(e) => { unsafe { (*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) })); } } } self.complete(); unsafe { self.clone_result().map(Async::Ready) } } } impl Clone for Shared where F: Future { fn clone(&self) -> Self { let next_clone_id = self.inner.next_clone_id.fetch_add(1, SeqCst); Shared { inner: self.inner.clone(), waiter: next_clone_id, } } } impl Drop for Shared where F: Future { fn drop(&mut self) { let mut waiters = self.inner.notifier.waiters.lock().unwrap(); waiters.remove(&self.waiter); } } impl Notify for Notifier { fn notify(&self, _id: usize) { let waiters = mem::replace(&mut *self.waiters.lock().unwrap(), HashMap::new()); for (_, waiter) in waiters { waiter.notify(); } } } // The `F` is synchronized by a lock, so `F` doesn't need // to be `Sync`. However, its `Item` or `Error` are exposed // through an `Arc` but not lock, so they must be `Send + Sync`. unsafe impl Send for Inner where F: Future + Send, F::Item: Send + Sync, F::Error: Send + Sync, {} unsafe impl Sync for Inner where F: Future + Send, F::Item: Send + Sync, F::Error: Send + Sync, {} impl fmt::Debug for Inner where F: Future + fmt::Debug, F::Item: fmt::Debug, F::Error: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Inner") .finish() } } /// A wrapped item of the original future that is cloneable and implements Deref /// for ease of use. #[derive(Clone, Debug)] pub struct SharedItem { item: Arc, } impl ops::Deref for SharedItem { type Target = T; fn deref(&self) -> &T { &self.item.as_ref() } } /// A wrapped error of the original future that is cloneable and implements Deref /// for ease of use. #[derive(Clone, Debug)] pub struct SharedError { error: Arc, } impl ops::Deref for SharedError { type Target = E; fn deref(&self) -> &E { &self.error.as_ref() } } impl fmt::Display for SharedError where E: fmt::Display, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.error.fmt(f) } } impl error::Error for SharedError where E: error::Error, { #[allow(deprecated)] fn description(&self) -> &str { self.error.description() } #[allow(deprecated)] fn cause(&self) -> Option<&error::Error> { self.error.cause() } }