summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/future/shared.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/future/shared.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/future/shared.rs300
1 files changed, 300 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/future/shared.rs b/third_party/rust/futures-0.1.31/src/future/shared.rs
new file mode 100644
index 0000000000..e3b6d2fca7
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/future/shared.rs
@@ -0,0 +1,300 @@
+//! Definition of the Shared combinator, a future that is cloneable,
+//! and can be polled in multiple threads.
+//!
+//! # Examples
+//!
+//! ```
+//! use futures::future::*;
+//!
+//! let future = ok::<_, bool>(6);
+//! let shared1 = future.shared();
+//! let shared2 = shared1.clone();
+//! assert_eq!(6, *shared1.wait().unwrap());
+//! assert_eq!(6, *shared2.wait().unwrap());
+//! ```
+
+use {Future, Poll, Async};
+use task::{self, Task};
+use executor::{self, Notify, Spawn};
+
+use std::{error, fmt, mem, ops};
+use std::cell::UnsafeCell;
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::collections::HashMap;
+
+/// A future that is cloneable and can be polled in multiple threads.
+/// Use `Future::shared()` method to convert any future into a `Shared` future.
+#[must_use = "futures do nothing unless polled"]
+pub struct Shared<F: Future> {
+ inner: Arc<Inner<F>>,
+ waiter: usize,
+}
+
+impl<F> fmt::Debug for Shared<F>
+ where F: Future + fmt::Debug,
+ F::Item: fmt::Debug,
+ F::Error: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Shared")
+ .field("inner", &self.inner)
+ .field("waiter", &self.waiter)
+ .finish()
+ }
+}
+
+struct Inner<F: Future> {
+ next_clone_id: AtomicUsize,
+ future: UnsafeCell<Option<Spawn<F>>>,
+ result: UnsafeCell<Option<Result<SharedItem<F::Item>, SharedError<F::Error>>>>,
+ notifier: Arc<Notifier>,
+}
+
+struct Notifier {
+ state: AtomicUsize,
+ waiters: Mutex<HashMap<usize, Task>>,
+}
+
+const IDLE: usize = 0;
+const POLLING: usize = 1;
+const COMPLETE: usize = 2;
+const POISONED: usize = 3;
+
+pub fn new<F: Future>(future: F) -> Shared<F> {
+ Shared {
+ inner: Arc::new(Inner {
+ next_clone_id: AtomicUsize::new(1),
+ notifier: Arc::new(Notifier {
+ state: AtomicUsize::new(IDLE),
+ waiters: Mutex::new(HashMap::new()),
+ }),
+ future: UnsafeCell::new(Some(executor::spawn(future))),
+ result: UnsafeCell::new(None),
+ }),
+ waiter: 0,
+ }
+}
+
+impl<F> Shared<F> where F: Future {
+ // TODO: make this private
+ #[deprecated(since = "0.1.12", note = "use `Future::shared` instead")]
+ #[cfg(feature = "with-deprecated")]
+ #[doc(hidden)]
+ pub fn new(future: F) -> Self {
+ new(future)
+ }
+
+ /// If any clone of this `Shared` has completed execution, returns its result immediately
+ /// without blocking. Otherwise, returns None without triggering the work represented by
+ /// this `Shared`.
+ pub fn peek(&self) -> Option<Result<SharedItem<F::Item>, SharedError<F::Error>>> {
+ match self.inner.notifier.state.load(SeqCst) {
+ COMPLETE => {
+ Some(unsafe { self.clone_result() })
+ }
+ POISONED => panic!("inner future panicked during poll"),
+ _ => None,
+ }
+ }
+
+ fn set_waiter(&mut self) {
+ let mut waiters = self.inner.notifier.waiters.lock().unwrap();
+ waiters.insert(self.waiter, task::current());
+ }
+
+ unsafe fn clone_result(&self) -> Result<SharedItem<F::Item>, SharedError<F::Error>> {
+ match *self.inner.result.get() {
+ Some(Ok(ref item)) => Ok(SharedItem { item: item.item.clone() }),
+ Some(Err(ref e)) => Err(SharedError { error: e.error.clone() }),
+ _ => unreachable!(),
+ }
+ }
+
+ fn complete(&self) {
+ unsafe { *self.inner.future.get() = None };
+ self.inner.notifier.state.store(COMPLETE, SeqCst);
+ self.inner.notifier.notify(0);
+ }
+}
+
+impl<F> Future for Shared<F>
+ where F: Future
+{
+ type Item = SharedItem<F::Item>;
+ type Error = SharedError<F::Error>;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ self.set_waiter();
+
+ match self.inner.notifier.state.compare_and_swap(IDLE, POLLING, SeqCst) {
+ IDLE => {
+ // Lock acquired, fall through
+ }
+ POLLING => {
+ // Another task is currently polling, at this point we just want
+ // to ensure that our task handle is currently registered
+
+ return Ok(Async::NotReady);
+ }
+ COMPLETE => {
+ return unsafe { self.clone_result().map(Async::Ready) };
+ }
+ POISONED => panic!("inner future panicked during poll"),
+ _ => unreachable!(),
+ }
+
+ struct Reset<'a>(&'a AtomicUsize);
+
+ impl<'a> Drop for Reset<'a> {
+ fn drop(&mut self) {
+ use std::thread;
+
+ if thread::panicking() {
+ self.0.store(POISONED, SeqCst);
+ }
+ }
+ }
+
+ let _reset = Reset(&self.inner.notifier.state);
+
+ // Poll the future
+ let res = unsafe {
+ (*self.inner.future.get()).as_mut().unwrap()
+ .poll_future_notify(&self.inner.notifier, 0)
+ };
+ match res {
+ Ok(Async::NotReady) => {
+ // Not ready, try to release the handle
+ match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
+ POLLING => {
+ // Success
+ return Ok(Async::NotReady);
+ }
+ _ => unreachable!(),
+ }
+
+ }
+ Ok(Async::Ready(i)) => {
+ unsafe {
+ (*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) }));
+ }
+ }
+ Err(e) => {
+ unsafe {
+ (*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) }));
+ }
+ }
+ }
+
+ self.complete();
+ unsafe { self.clone_result().map(Async::Ready) }
+ }
+}
+
+impl<F> Clone for Shared<F> where F: Future {
+ fn clone(&self) -> Self {
+ let next_clone_id = self.inner.next_clone_id.fetch_add(1, SeqCst);
+
+ Shared {
+ inner: self.inner.clone(),
+ waiter: next_clone_id,
+ }
+ }
+}
+
+impl<F> Drop for Shared<F> where F: Future {
+ fn drop(&mut self) {
+ let mut waiters = self.inner.notifier.waiters.lock().unwrap();
+ waiters.remove(&self.waiter);
+ }
+}
+
+impl Notify for Notifier {
+ fn notify(&self, _id: usize) {
+ let waiters = mem::replace(&mut *self.waiters.lock().unwrap(), HashMap::new());
+
+ for (_, waiter) in waiters {
+ waiter.notify();
+ }
+ }
+}
+
+// The `F` is synchronized by a lock, so `F` doesn't need
+// to be `Sync`. However, its `Item` or `Error` are exposed
+// through an `Arc` but not lock, so they must be `Send + Sync`.
+unsafe impl<F> Send for Inner<F>
+ where F: Future + Send,
+ F::Item: Send + Sync,
+ F::Error: Send + Sync,
+{}
+
+unsafe impl<F> Sync for Inner<F>
+ where F: Future + Send,
+ F::Item: Send + Sync,
+ F::Error: Send + Sync,
+{}
+
+impl<F> fmt::Debug for Inner<F>
+ where F: Future + fmt::Debug,
+ F::Item: fmt::Debug,
+ F::Error: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Inner")
+ .finish()
+ }
+}
+
+/// A wrapped item of the original future that is cloneable and implements Deref
+/// for ease of use.
+#[derive(Clone, Debug)]
+pub struct SharedItem<T> {
+ item: Arc<T>,
+}
+
+impl<T> ops::Deref for SharedItem<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ &self.item.as_ref()
+ }
+}
+
+/// A wrapped error of the original future that is cloneable and implements Deref
+/// for ease of use.
+#[derive(Clone, Debug)]
+pub struct SharedError<E> {
+ error: Arc<E>,
+}
+
+impl<E> ops::Deref for SharedError<E> {
+ type Target = E;
+
+ fn deref(&self) -> &E {
+ &self.error.as_ref()
+ }
+}
+
+impl<E> fmt::Display for SharedError<E>
+ where E: fmt::Display,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.error.fmt(f)
+ }
+}
+
+impl<E> error::Error for SharedError<E>
+ where E: error::Error,
+{
+ #[allow(deprecated)]
+ fn description(&self) -> &str {
+ self.error.description()
+ }
+
+ #[allow(deprecated)]
+ fn cause(&self) -> Option<&error::Error> {
+ self.error.cause()
+ }
+}