summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/sync
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/sync
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/sync')
-rw-r--r--third_party/rust/futures-0.1.31/src/sync/bilock.rs298
-rw-r--r--third_party/rust/futures-0.1.31/src/sync/mod.rs17
-rw-r--r--third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs1187
-rw-r--r--third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs151
-rw-r--r--third_party/rust/futures-0.1.31/src/sync/oneshot.rs611
5 files changed, 2264 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/sync/bilock.rs b/third_party/rust/futures-0.1.31/src/sync/bilock.rs
new file mode 100644
index 0000000000..af9e1eeb2c
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/sync/bilock.rs
@@ -0,0 +1,298 @@
+use std::any::Any;
+use std::boxed::Box;
+use std::cell::UnsafeCell;
+use std::error::Error;
+use std::fmt;
+use std::mem;
+use std::ops::{Deref, DerefMut};
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+use {Async, Future, Poll};
+use task::{self, Task};
+
+/// A type of futures-powered synchronization primitive which is a mutex between
+/// two possible owners.
+///
+/// This primitive is not as generic as a full-blown mutex but is sufficient for
+/// many use cases where there are only two possible owners of a resource. The
+/// implementation of `BiLock` can be more optimized for just the two possible
+/// owners.
+///
+/// Note that it's possible to use this lock through a poll-style interface with
+/// the `poll_lock` method but you can also use it as a future with the `lock`
+/// method that consumes a `BiLock` and returns a future that will resolve when
+/// it's locked.
+///
+/// A `BiLock` is typically used for "split" operations where data which serves
+/// two purposes wants to be split into two to be worked with separately. For
+/// example a TCP stream could be both a reader and a writer or a framing layer
+/// could be both a stream and a sink for messages. A `BiLock` enables splitting
+/// these two and then using each independently in a futures-powered fashion.
+#[derive(Debug)]
+pub struct BiLock<T> {
+ inner: Arc<Inner<T>>,
+}
+
+#[derive(Debug)]
+struct Inner<T> {
+ state: AtomicUsize,
+ inner: Option<UnsafeCell<T>>,
+}
+
+unsafe impl<T: Send> Send for Inner<T> {}
+unsafe impl<T: Send> Sync for Inner<T> {}
+
+impl<T> BiLock<T> {
+ /// Creates a new `BiLock` protecting the provided data.
+ ///
+ /// Two handles to the lock are returned, and these are the only two handles
+ /// that will ever be available to the lock. These can then be sent to separate
+ /// tasks to be managed there.
+ pub fn new(t: T) -> (BiLock<T>, BiLock<T>) {
+ let inner = Arc::new(Inner {
+ state: AtomicUsize::new(0),
+ inner: Some(UnsafeCell::new(t)),
+ });
+
+ (BiLock { inner: inner.clone() }, BiLock { inner: inner })
+ }
+
+ /// Attempt to acquire this lock, returning `NotReady` if it can't be
+ /// acquired.
+ ///
+ /// This function will acquire the lock in a nonblocking fashion, returning
+ /// immediately if the lock is already held. If the lock is successfully
+ /// acquired then `Async::Ready` is returned with a value that represents
+ /// the locked value (and can be used to access the protected data). The
+ /// lock is unlocked when the returned `BiLockGuard` is dropped.
+ ///
+ /// If the lock is already held then this function will return
+ /// `Async::NotReady`. In this case the current task will also be scheduled
+ /// to receive a notification when the lock would otherwise become
+ /// available.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if called outside the context of a future's
+ /// task.
+ pub fn poll_lock(&self) -> Async<BiLockGuard<T>> {
+ loop {
+ match self.inner.state.swap(1, SeqCst) {
+ // Woohoo, we grabbed the lock!
+ 0 => return Async::Ready(BiLockGuard { inner: self }),
+
+ // Oops, someone else has locked the lock
+ 1 => {}
+
+ // A task was previously blocked on this lock, likely our task,
+ // so we need to update that task.
+ n => unsafe {
+ drop(Box::from_raw(n as *mut Task));
+ }
+ }
+
+ let me = Box::new(task::current());
+ let me = Box::into_raw(me) as usize;
+
+ match self.inner.state.compare_exchange(1, me, SeqCst, SeqCst) {
+ // The lock is still locked, but we've now parked ourselves, so
+ // just report that we're scheduled to receive a notification.
+ Ok(_) => return Async::NotReady,
+
+ // Oops, looks like the lock was unlocked after our swap above
+ // and before the compare_exchange. Deallocate what we just
+ // allocated and go through the loop again.
+ Err(0) => unsafe {
+ drop(Box::from_raw(me as *mut Task));
+ },
+
+ // The top of this loop set the previous state to 1, so if we
+ // failed the CAS above then it's because the previous value was
+ // *not* zero or one. This indicates that a task was blocked,
+ // but we're trying to acquire the lock and there's only one
+ // other reference of the lock, so it should be impossible for
+ // that task to ever block itself.
+ Err(n) => panic!("invalid state: {}", n),
+ }
+ }
+ }
+
+ /// Perform a "blocking lock" of this lock, consuming this lock handle and
+ /// returning a future to the acquired lock.
+ ///
+ /// This function consumes the `BiLock<T>` and returns a sentinel future,
+ /// `BiLockAcquire<T>`. The returned future will resolve to
+ /// `BiLockAcquired<T>` which represents a locked lock similarly to
+ /// `BiLockGuard<T>`.
+ ///
+ /// Note that the returned future will never resolve to an error.
+ pub fn lock(self) -> BiLockAcquire<T> {
+ BiLockAcquire {
+ inner: Some(self),
+ }
+ }
+
+ /// Attempts to put the two "halves" of a `BiLock<T>` back together and
+ /// recover the original value. Succeeds only if the two `BiLock<T>`s
+ /// originated from the same call to `BiLock::new`.
+ pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>> {
+ if &*self.inner as *const _ == &*other.inner as *const _ {
+ drop(other);
+ let inner = Arc::try_unwrap(self.inner)
+ .ok()
+ .expect("futures: try_unwrap failed in BiLock<T>::reunite");
+ Ok(unsafe { inner.into_inner() })
+ } else {
+ Err(ReuniteError(self, other))
+ }
+ }
+
+ fn unlock(&self) {
+ match self.inner.state.swap(0, SeqCst) {
+ // we've locked the lock, shouldn't be possible for us to see an
+ // unlocked lock.
+ 0 => panic!("invalid unlocked state"),
+
+ // Ok, no one else tried to get the lock, we're done.
+ 1 => {}
+
+ // Another task has parked themselves on this lock, let's wake them
+ // up as its now their turn.
+ n => unsafe {
+ Box::from_raw(n as *mut Task).notify();
+ }
+ }
+ }
+}
+
+impl<T> Inner<T> {
+ unsafe fn into_inner(mut self) -> T {
+ mem::replace(&mut self.inner, None).unwrap().into_inner()
+ }
+}
+
+impl<T> Drop for Inner<T> {
+ fn drop(&mut self) {
+ assert_eq!(self.state.load(SeqCst), 0);
+ }
+}
+
+/// Error indicating two `BiLock<T>`s were not two halves of a whole, and
+/// thus could not be `reunite`d.
+pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
+
+impl<T> fmt::Debug for ReuniteError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("ReuniteError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for ReuniteError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "tried to reunite two BiLocks that don't form a pair")
+ }
+}
+
+impl<T: Any> Error for ReuniteError<T> {
+ fn description(&self) -> &str {
+ "tried to reunite two BiLocks that don't form a pair"
+ }
+}
+
+/// Returned RAII guard from the `poll_lock` method.
+///
+/// This structure acts as a sentinel to the data in the `BiLock<T>` itself,
+/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
+/// unlocked.
+#[derive(Debug)]
+pub struct BiLockGuard<'a, T: 'a> {
+ inner: &'a BiLock<T>,
+}
+
+impl<'a, T> Deref for BiLockGuard<'a, T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ unsafe { &*self.inner.inner.inner.as_ref().unwrap().get() }
+ }
+}
+
+impl<'a, T> DerefMut for BiLockGuard<'a, T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.inner.inner.inner.as_ref().unwrap().get() }
+ }
+}
+
+impl<'a, T> Drop for BiLockGuard<'a, T> {
+ fn drop(&mut self) {
+ self.inner.unlock();
+ }
+}
+
+/// Future returned by `BiLock::lock` which will resolve when the lock is
+/// acquired.
+#[derive(Debug)]
+pub struct BiLockAcquire<T> {
+ inner: Option<BiLock<T>>,
+}
+
+impl<T> Future for BiLockAcquire<T> {
+ type Item = BiLockAcquired<T>;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<BiLockAcquired<T>, ()> {
+ match self.inner.as_ref().expect("cannot poll after Ready").poll_lock() {
+ Async::Ready(r) => {
+ mem::forget(r);
+ }
+ Async::NotReady => return Ok(Async::NotReady),
+ }
+ Ok(Async::Ready(BiLockAcquired { inner: self.inner.take() }))
+ }
+}
+
+/// Resolved value of the `BiLockAcquire<T>` future.
+///
+/// This value, like `BiLockGuard<T>`, is a sentinel to the value `T` through
+/// implementations of `Deref` and `DerefMut`. When dropped will unlock the
+/// lock, and the original unlocked `BiLock<T>` can be recovered through the
+/// `unlock` method.
+#[derive(Debug)]
+pub struct BiLockAcquired<T> {
+ inner: Option<BiLock<T>>,
+}
+
+impl<T> BiLockAcquired<T> {
+ /// Recovers the original `BiLock<T>`, unlocking this lock.
+ pub fn unlock(mut self) -> BiLock<T> {
+ let bi_lock = self.inner.take().unwrap();
+
+ bi_lock.unlock();
+
+ bi_lock
+ }
+}
+
+impl<T> Deref for BiLockAcquired<T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ unsafe { &*self.inner.as_ref().unwrap().inner.inner.as_ref().unwrap().get() }
+ }
+}
+
+impl<T> DerefMut for BiLockAcquired<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.inner.as_mut().unwrap().inner.inner.as_ref().unwrap().get() }
+ }
+}
+
+impl<T> Drop for BiLockAcquired<T> {
+ fn drop(&mut self) {
+ if let Some(ref bi_lock) = self.inner {
+ bi_lock.unlock();
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/sync/mod.rs b/third_party/rust/futures-0.1.31/src/sync/mod.rs
new file mode 100644
index 0000000000..0a46e9afbe
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/sync/mod.rs
@@ -0,0 +1,17 @@
+//! Future-aware synchronization
+//!
+//! This module, which is modeled after `std::sync`, contains user-space
+//! synchronization tools that work with futures, streams and sinks. In
+//! particular, these synchronizers do *not* block physical OS threads, but
+//! instead work at the task level.
+//!
+//! More information and examples of how to use these synchronization primitives
+//! can be found [online at tokio.rs].
+//!
+//! [online at tokio.rs]: https://tokio.rs/docs/going-deeper-futures/synchronization/
+
+pub mod oneshot;
+pub mod mpsc;
+mod bilock;
+
+pub use self::bilock::{BiLock, BiLockGuard, BiLockAcquire, BiLockAcquired};
diff --git a/third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs b/third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs
new file mode 100644
index 0000000000..31d2320ab6
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs
@@ -0,0 +1,1187 @@
+//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
+//!
+//! A channel can be used as a communication primitive between tasks running on
+//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
+//! handles. `Receiver` implements `Stream` and allows a task to read values
+//! out of the channel. If there is no message to read from the channel, the
+//! current task will be notified when a new value is sent. `Sender` implements
+//! the `Sink` trait and allows a task to send messages into the channel. If
+//! the channel is at capacity, then send will be rejected and the task will be
+//! notified when additional capacity is available.
+//!
+//! # Disconnection
+//!
+//! When all `Sender` handles have been dropped, it is no longer possible to
+//! send values into the channel. This is considered the termination event of
+//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
+//!
+//! If the receiver handle is dropped, then messages can no longer be read out
+//! of the channel. In this case, a `send` will result in an error.
+//!
+//! # Clean Shutdown
+//!
+//! If the `Receiver` is simply dropped, then it is possible for there to be
+//! messages still in the channel that will not be processed. As such, it is
+//! usually desirable to perform a "clean" shutdown. To do this, the receiver
+//! will first call `close`, which will prevent any further messages to be sent
+//! into the channel. Then, the receiver consumes the channel to completion, at
+//! which point the receiver can be dropped.
+
+// At the core, the channel uses an atomic FIFO queue for message passing. This
+// queue is used as the primary coordination primitive. In order to enforce
+// capacity limits and handle back pressure, a secondary FIFO queue is used to
+// send parked task handles.
+//
+// The general idea is that the channel is created with a `buffer` size of `n`.
+// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
+// slot to hold a message. This allows `Sender` to know for a fact that a send
+// will succeed *before* starting to do the actual work of sending the value.
+// Since most of this work is lock-free, once the work starts, it is impossible
+// to safely revert.
+//
+// If the sender is unable to process a send operation, then the current
+// task is parked and the handle is sent on the parked task queue.
+//
+// Note that the implementation guarantees that the channel capacity will never
+// exceed the configured limit, however there is no *strict* guarantee that the
+// receiver will wake up a parked task *immediately* when a slot becomes
+// available. However, it will almost always unpark a task when a slot becomes
+// available and it is *guaranteed* that a sender will be unparked when the
+// message that caused the sender to become parked is read out of the channel.
+//
+// The steps for sending a message are roughly:
+//
+// 1) Increment the channel message count
+// 2) If the channel is at capacity, push the task handle onto the wait queue
+// 3) Push the message onto the message queue.
+//
+// The steps for receiving a message are roughly:
+//
+// 1) Pop a message from the message queue
+// 2) Pop a task handle from the wait queue
+// 3) Decrement the channel message count.
+//
+// It's important for the order of operations on lock-free structures to happen
+// in reverse order between the sender and receiver. This makes the message
+// queue the primary coordination structure and establishes the necessary
+// happens-before semantics required for the acquire / release semantics used
+// by the queue structure.
+
+use std::fmt;
+use std::error::Error;
+use std::any::Any;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::usize;
+
+use sync::mpsc::queue::{Queue, PopResult};
+use sync::oneshot;
+use task::{self, Task};
+use future::Executor;
+use sink::SendAll;
+use resultstream::{self, Results};
+use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
+
+mod queue;
+
+/// The transmission end of a channel which is used to send values.
+///
+/// This is created by the `channel` method.
+#[derive(Debug)]
+pub struct Sender<T> {
+ // Channel state shared between the sender and receiver.
+ inner: Arc<Inner<T>>,
+
+ // Handle to the task that is blocked on this sender. This handle is sent
+ // to the receiver half in order to be notified when the sender becomes
+ // unblocked.
+ sender_task: Arc<Mutex<SenderTask>>,
+
+ // True if the sender might be blocked. This is an optimization to avoid
+ // having to lock the mutex most of the time.
+ maybe_parked: bool,
+}
+
+/// The transmission end of a channel which is used to send values.
+///
+/// This is created by the `unbounded` method.
+#[derive(Debug)]
+pub struct UnboundedSender<T>(Sender<T>);
+
+trait AssertKinds: Send + Sync + Clone {}
+impl AssertKinds for UnboundedSender<u32> {}
+
+
+/// The receiving end of a channel which implements the `Stream` trait.
+///
+/// This is a concrete implementation of a stream which can be used to represent
+/// a stream of values being computed elsewhere. This is created by the
+/// `channel` method.
+#[derive(Debug)]
+pub struct Receiver<T> {
+ inner: Arc<Inner<T>>,
+}
+
+/// The receiving end of a channel which implements the `Stream` trait.
+///
+/// This is a concrete implementation of a stream which can be used to represent
+/// a stream of values being computed elsewhere. This is created by the
+/// `unbounded` method.
+#[derive(Debug)]
+pub struct UnboundedReceiver<T>(Receiver<T>);
+
+/// Error type for sending, used when the receiving end of a channel is
+/// dropped
+#[derive(Clone, PartialEq, Eq)]
+pub struct SendError<T>(T);
+
+/// Error type returned from `try_send`
+#[derive(Clone, PartialEq, Eq)]
+pub struct TrySendError<T> {
+ kind: TrySendErrorKind<T>,
+}
+
+#[derive(Clone, PartialEq, Eq)]
+enum TrySendErrorKind<T> {
+ Full(T),
+ Disconnected(T),
+}
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("SendError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "send failed because receiver is gone")
+ }
+}
+
+impl<T: Any> Error for SendError<T>
+{
+ fn description(&self) -> &str {
+ "send failed because receiver is gone"
+ }
+}
+
+impl<T> SendError<T> {
+ /// Returns the message that was attempted to be sent but failed.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("TrySendError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ if self.is_full() {
+ write!(fmt, "send failed because channel is full")
+ } else {
+ write!(fmt, "send failed because receiver is gone")
+ }
+ }
+}
+
+impl<T: Any> Error for TrySendError<T> {
+ fn description(&self) -> &str {
+ if self.is_full() {
+ "send failed because channel is full"
+ } else {
+ "send failed because receiver is gone"
+ }
+ }
+}
+
+impl<T> TrySendError<T> {
+ /// Returns true if this error is a result of the channel being full
+ pub fn is_full(&self) -> bool {
+ use self::TrySendErrorKind::*;
+
+ match self.kind {
+ Full(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if this error is a result of the receiver being dropped
+ pub fn is_disconnected(&self) -> bool {
+ use self::TrySendErrorKind::*;
+
+ match self.kind {
+ Disconnected(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Returns the message that was attempted to be sent but failed.
+ pub fn into_inner(self) -> T {
+ use self::TrySendErrorKind::*;
+
+ match self.kind {
+ Full(v) | Disconnected(v) => v,
+ }
+ }
+}
+
+#[derive(Debug)]
+struct Inner<T> {
+ // Max buffer size of the channel. If `None` then the channel is unbounded.
+ buffer: Option<usize>,
+
+ // Internal channel state. Consists of the number of messages stored in the
+ // channel as well as a flag signalling that the channel is closed.
+ state: AtomicUsize,
+
+ // Atomic, FIFO queue used to send messages to the receiver
+ message_queue: Queue<Option<T>>,
+
+ // Atomic, FIFO queue used to send parked task handles to the receiver.
+ parked_queue: Queue<Arc<Mutex<SenderTask>>>,
+
+ // Number of senders in existence
+ num_senders: AtomicUsize,
+
+ // Handle to the receiver's task.
+ recv_task: Mutex<ReceiverTask>,
+}
+
+// Struct representation of `Inner::state`.
+#[derive(Debug, Clone, Copy)]
+struct State {
+ // `true` when the channel is open
+ is_open: bool,
+
+ // Number of messages in the channel
+ num_messages: usize,
+}
+
+#[derive(Debug)]
+struct ReceiverTask {
+ unparked: bool,
+ task: Option<Task>,
+}
+
+// Returned from Receiver::try_park()
+enum TryPark {
+ Parked,
+ Closed,
+ NotEmpty,
+}
+
+// The `is_open` flag is stored in the left-most bit of `Inner::state`
+const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
+
+// When a new channel is created, it is created in the open state with no
+// pending messages.
+const INIT_STATE: usize = OPEN_MASK;
+
+// The maximum number of messages that a channel can track is `usize::MAX >> 1`
+const MAX_CAPACITY: usize = !(OPEN_MASK);
+
+// The maximum requested buffer size must be less than the maximum capacity of
+// a channel. This is because each sender gets a guaranteed slot.
+const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
+
+// Sent to the consumer to wake up blocked producers
+#[derive(Debug)]
+struct SenderTask {
+ task: Option<Task>,
+ is_parked: bool,
+}
+
+impl SenderTask {
+ fn new() -> Self {
+ SenderTask {
+ task: None,
+ is_parked: false,
+ }
+ }
+
+ fn notify(&mut self) {
+ self.is_parked = false;
+
+ if let Some(task) = self.task.take() {
+ task.notify();
+ }
+ }
+}
+
+/// Creates an in-memory channel implementation of the `Stream` trait with
+/// bounded capacity.
+///
+/// This method creates a concrete implementation of the `Stream` trait which
+/// can be used to send values across threads in a streaming fashion. This
+/// channel is unique in that it implements back pressure to ensure that the
+/// sender never outpaces the receiver. The channel capacity is equal to
+/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
+/// in the channel capacity, and on top of that there are `buffer` "first come,
+/// first serve" slots available to all senders.
+///
+/// The `Receiver` returned implements the `Stream` trait and has access to any
+/// number of the associated combinators for transforming the result.
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ // Check that the requested buffer size does not exceed the maximum buffer
+ // size permitted by the system.
+ assert!(buffer < MAX_BUFFER, "requested buffer size too large");
+ channel2(Some(buffer))
+}
+
+/// Creates an in-memory channel implementation of the `Stream` trait with
+/// unbounded capacity.
+///
+/// This method creates a concrete implementation of the `Stream` trait which
+/// can be used to send values across threads in a streaming fashion. A `send`
+/// on this channel will always succeed as long as the receive half has not
+/// been closed. If the receiver falls behind, messages will be buffered
+/// internally.
+///
+/// **Note** that the amount of available system memory is an implicit bound to
+/// the channel. Using an `unbounded` channel has the ability of causing the
+/// process to run out of memory. In this case, the process will be aborted.
+pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+ let (tx, rx) = channel2(None);
+ (UnboundedSender(tx), UnboundedReceiver(rx))
+}
+
+fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Inner {
+ buffer: buffer,
+ state: AtomicUsize::new(INIT_STATE),
+ message_queue: Queue::new(),
+ parked_queue: Queue::new(),
+ num_senders: AtomicUsize::new(1),
+ recv_task: Mutex::new(ReceiverTask {
+ unparked: false,
+ task: None,
+ }),
+ });
+
+ let tx = Sender {
+ inner: inner.clone(),
+ sender_task: Arc::new(Mutex::new(SenderTask::new())),
+ maybe_parked: false,
+ };
+
+ let rx = Receiver {
+ inner: inner,
+ };
+
+ (tx, rx)
+}
+
+/*
+ *
+ * ===== impl Sender =====
+ *
+ */
+
+impl<T> Sender<T> {
+ /// Attempts to send a message on this `Sender<T>` without blocking.
+ ///
+ /// This function, unlike `start_send`, is safe to call whether it's being
+ /// called on a task or not. Note that this function, however, will *not*
+ /// attempt to block the current task if the message cannot be sent.
+ ///
+ /// It is not recommended to call this function from inside of a future,
+ /// only from an external thread where you've otherwise arranged to be
+ /// notified when the channel is no longer full.
+ pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+ // If the sender is currently blocked, reject the message
+ if !self.poll_unparked(false).is_ready() {
+ return Err(TrySendError {
+ kind: TrySendErrorKind::Full(msg),
+ });
+ }
+
+ // The channel has capacity to accept the message, so send it
+ self.do_send(Some(msg), false)
+ .map_err(|SendError(v)| {
+ TrySendError {
+ kind: TrySendErrorKind::Disconnected(v),
+ }
+ })
+ }
+
+ // Do the send without failing
+ // None means close
+ fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
+ // First, increment the number of messages contained by the channel.
+ // This operation will also atomically determine if the sender task
+ // should be parked.
+ //
+ // None is returned in the case that the channel has been closed by the
+ // receiver. This happens when `Receiver::close` is called or the
+ // receiver is dropped.
+ let park_self = match self.inc_num_messages(msg.is_none()) {
+ Some(park_self) => park_self,
+ None => {
+ // The receiver has closed the channel. Only abort if actually
+ // sending a message. It is important that the stream
+ // termination (None) is always sent. This technically means
+ // that it is possible for the queue to contain the following
+ // number of messages:
+ //
+ // num-senders + buffer + 1
+ //
+ if let Some(msg) = msg {
+ return Err(SendError(msg));
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ // If the channel has reached capacity, then the sender task needs to
+ // be parked. This will send the task handle on the parked task queue.
+ //
+ // However, when `do_send` is called while dropping the `Sender`,
+ // `task::current()` can't be called safely. In this case, in order to
+ // maintain internal consistency, a blank message is pushed onto the
+ // parked task queue.
+ if park_self {
+ self.park(do_park);
+ }
+
+ self.queue_push_and_signal(msg);
+
+ Ok(())
+ }
+
+ // Do the send without parking current task.
+ //
+ // To be called from unbounded sender.
+ fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
+ match self.inc_num_messages(false) {
+ Some(park_self) => assert!(!park_self),
+ None => return Err(SendError(msg)),
+ };
+
+ self.queue_push_and_signal(Some(msg));
+
+ Ok(())
+ }
+
+ // Push message to the queue and signal to the receiver
+ fn queue_push_and_signal(&self, msg: Option<T>) {
+ // Push the message onto the message queue
+ self.inner.message_queue.push(msg);
+
+ // Signal to the receiver that a message has been enqueued. If the
+ // receiver is parked, this will unpark the task.
+ self.signal();
+ }
+
+ // Increment the number of queued messages. Returns if the sender should
+ // block.
+ fn inc_num_messages(&self, close: bool) -> Option<bool> {
+ let mut curr = self.inner.state.load(SeqCst);
+
+ loop {
+ let mut state = decode_state(curr);
+
+ // The receiver end closed the channel.
+ if !state.is_open {
+ return None;
+ }
+
+ // This probably is never hit? Odds are the process will run out of
+ // memory first. It may be worth to return something else in this
+ // case?
+ assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
+ sending this messages would overflow the state");
+
+ state.num_messages += 1;
+
+ // The channel is closed by all sender handles being dropped.
+ if close {
+ state.is_open = false;
+ }
+
+ let next = encode_state(&state);
+ match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => {
+ // Block if the current number of pending messages has exceeded
+ // the configured buffer size
+ let park_self = match self.inner.buffer {
+ Some(buffer) => state.num_messages > buffer,
+ None => false,
+ };
+
+ return Some(park_self)
+ }
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+
+ // Signal to the receiver task that a message has been enqueued
+ fn signal(&self) {
+ // TODO
+ // This logic can probably be improved by guarding the lock with an
+ // atomic.
+ //
+ // Do this step first so that the lock is dropped when
+ // `unpark` is called
+ let task = {
+ let mut recv_task = self.inner.recv_task.lock().unwrap();
+
+ // If the receiver has already been unparked, then there is nothing
+ // more to do
+ if recv_task.unparked {
+ return;
+ }
+
+ // Setting this flag enables the receiving end to detect that
+ // an unpark event happened in order to avoid unnecessarily
+ // parking.
+ recv_task.unparked = true;
+ recv_task.task.take()
+ };
+
+ if let Some(task) = task {
+ task.notify();
+ }
+ }
+
+ fn park(&mut self, can_park: bool) {
+ // TODO: clean up internal state if the task::current will fail
+
+ let task = if can_park {
+ Some(task::current())
+ } else {
+ None
+ };
+
+ {
+ let mut sender = self.sender_task.lock().unwrap();
+ sender.task = task;
+ sender.is_parked = true;
+ }
+
+ // Send handle over queue
+ let t = self.sender_task.clone();
+ self.inner.parked_queue.push(t);
+
+ // Check to make sure we weren't closed after we sent our task on the
+ // queue
+ let state = decode_state(self.inner.state.load(SeqCst));
+ self.maybe_parked = state.is_open;
+ }
+
+ /// Polls the channel to determine if there is guaranteed to be capacity to send at least one
+ /// item without waiting.
+ ///
+ /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
+ /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns
+ /// `Err(SendError(_))` if the receiver has been dropped.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if called from outside the context of a task or future.
+ pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
+ let state = decode_state(self.inner.state.load(SeqCst));
+ if !state.is_open {
+ return Err(SendError(()));
+ }
+
+ Ok(self.poll_unparked(true))
+ }
+
+ /// Returns whether this channel is closed without needing a context.
+ pub fn is_closed(&self) -> bool {
+ !decode_state(self.inner.state.load(SeqCst)).is_open
+ }
+
+ fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
+ // First check the `maybe_parked` variable. This avoids acquiring the
+ // lock in most cases
+ if self.maybe_parked {
+ // Get a lock on the task handle
+ let mut task = self.sender_task.lock().unwrap();
+
+ if !task.is_parked {
+ self.maybe_parked = false;
+ return Async::Ready(())
+ }
+
+ // At this point, an unpark request is pending, so there will be an
+ // unpark sometime in the future. We just need to make sure that
+ // the correct task will be notified.
+ //
+ // Update the task in case the `Sender` has been moved to another
+ // task
+ task.task = if do_park {
+ Some(task::current())
+ } else {
+ None
+ };
+
+ Async::NotReady
+ } else {
+ Async::Ready(())
+ }
+ }
+}
+
+impl<T> Sink for Sender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ // If the sender is currently blocked, reject the message before doing
+ // any work.
+ if !self.poll_unparked(true).is_ready() {
+ return Ok(AsyncSink::NotReady(msg));
+ }
+
+ // The channel has capacity to accept the message, so send it.
+ self.do_send(Some(msg), true)?;
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ self.poll_ready()
+ // At this point, the value cannot be returned and `SendError`
+ // cannot be created with a `T` without breaking backwards
+ // comptibility. This means we cannot return an error.
+ //
+ // That said, there is also no guarantee that a `poll_complete`
+ // returning `Ok` implies the receiver sees the message.
+ .or_else(|_| Ok(().into()))
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ /// Returns whether this channel is closed without needing a context.
+ pub fn is_closed(&self) -> bool {
+ self.0.is_closed()
+ }
+
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ #[deprecated(note = "renamed to `unbounded_send`")]
+ #[doc(hidden)]
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ self.unbounded_send(msg)
+ }
+
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
+ self.0.do_send_nb(msg)
+ }
+}
+
+impl<T> Sink for UnboundedSender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.0.start_send(msg)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ self.0.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<'a, T> Sink for &'a UnboundedSender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.0.do_send_nb(msg)?;
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> Clone for UnboundedSender<T> {
+ fn clone(&self) -> UnboundedSender<T> {
+ UnboundedSender(self.0.clone())
+ }
+}
+
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Sender<T> {
+ // Since this atomic op isn't actually guarding any memory and we don't
+ // care about any orderings besides the ordering on the single atomic
+ // variable, a relaxed ordering is acceptable.
+ let mut curr = self.inner.num_senders.load(SeqCst);
+
+ loop {
+ // If the maximum number of senders has been reached, then fail
+ if curr == self.inner.max_senders() {
+ panic!("cannot clone `Sender` -- too many outstanding senders");
+ }
+
+ debug_assert!(curr < self.inner.max_senders());
+
+ let next = curr + 1;
+ let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
+
+ // The ABA problem doesn't matter here. We only care that the
+ // number of senders never exceeds the maximum.
+ if actual == curr {
+ return Sender {
+ inner: self.inner.clone(),
+ sender_task: Arc::new(Mutex::new(SenderTask::new())),
+ maybe_parked: false,
+ };
+ }
+
+ curr = actual;
+ }
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ // Ordering between variables don't matter here
+ let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
+
+ if prev == 1 {
+ let _ = self.do_send(None, false);
+ }
+ }
+}
+
+/*
+ *
+ * ===== impl Receiver =====
+ *
+ */
+
+impl<T> Receiver<T> {
+ /// Closes the receiving half
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ let mut curr = self.inner.state.load(SeqCst);
+
+ loop {
+ let mut state = decode_state(curr);
+
+ if !state.is_open {
+ break
+ }
+
+ state.is_open = false;
+
+ let next = encode_state(&state);
+ match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => break,
+ Err(actual) => curr = actual,
+ }
+ }
+
+ // Wake up any threads waiting as they'll see that we've closed the
+ // channel and will continue on their merry way.
+ loop {
+ match unsafe { self.inner.parked_queue.pop() } {
+ PopResult::Data(task) => {
+ task.lock().unwrap().notify();
+ }
+ PopResult::Empty => break,
+ PopResult::Inconsistent => thread::yield_now(),
+ }
+ }
+ }
+
+ fn next_message(&mut self) -> Async<Option<T>> {
+ // Pop off a message
+ loop {
+ match unsafe { self.inner.message_queue.pop() } {
+ PopResult::Data(msg) => {
+ // If there are any parked task handles in the parked queue,
+ // pop one and unpark it.
+ self.unpark_one();
+ // Decrement number of messages
+ self.dec_num_messages();
+
+ return Async::Ready(msg);
+ }
+ PopResult::Empty => {
+ // The queue is empty, return NotReady
+ return Async::NotReady;
+ }
+ PopResult::Inconsistent => {
+ // Inconsistent means that there will be a message to pop
+ // in a short time. This branch can only be reached if
+ // values are being produced from another thread, so there
+ // are a few ways that we can deal with this:
+ //
+ // 1) Spin
+ // 2) thread::yield_now()
+ // 3) task::current().unwrap() & return NotReady
+ //
+ // For now, thread::yield_now() is used, but it would
+ // probably be better to spin a few times then yield.
+ thread::yield_now();
+ }
+ }
+ }
+ }
+
+ // Unpark a single task handle if there is one pending in the parked queue
+ fn unpark_one(&mut self) {
+ loop {
+ match unsafe { self.inner.parked_queue.pop() } {
+ PopResult::Data(task) => {
+ task.lock().unwrap().notify();
+ return;
+ }
+ PopResult::Empty => {
+ // Queue empty, no task to wake up.
+ return;
+ }
+ PopResult::Inconsistent => {
+ // Same as above
+ thread::yield_now();
+ }
+ }
+ }
+ }
+
+ // Try to park the receiver task
+ fn try_park(&self) -> TryPark {
+ let curr = self.inner.state.load(SeqCst);
+ let state = decode_state(curr);
+
+ // If the channel is closed, then there is no need to park.
+ if state.is_closed() {
+ return TryPark::Closed;
+ }
+
+ // First, track the task in the `recv_task` slot
+ let mut recv_task = self.inner.recv_task.lock().unwrap();
+
+ if recv_task.unparked {
+ // Consume the `unpark` signal without actually parking
+ recv_task.unparked = false;
+ return TryPark::NotEmpty;
+ }
+
+ recv_task.task = Some(task::current());
+ TryPark::Parked
+ }
+
+ fn dec_num_messages(&self) {
+ let mut curr = self.inner.state.load(SeqCst);
+
+ loop {
+ let mut state = decode_state(curr);
+
+ state.num_messages -= 1;
+
+ let next = encode_state(&state);
+ match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+ Ok(_) => break,
+ Err(actual) => curr = actual,
+ }
+ }
+ }
+}
+
+impl<T> Stream for Receiver<T> {
+ type Item = T;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<T>, ()> {
+ loop {
+ // Try to read a message off of the message queue.
+ match self.next_message() {
+ Async::Ready(msg) => return Ok(Async::Ready(msg)),
+ Async::NotReady => {
+ // There are no messages to read, in this case, attempt to
+ // park. The act of parking will verify that the channel is
+ // still empty after the park operation has completed.
+ match self.try_park() {
+ TryPark::Parked => {
+ // The task was parked, and the channel is still
+ // empty, return NotReady.
+ return Ok(Async::NotReady);
+ }
+ TryPark::Closed => {
+ // The channel is closed, there will be no further
+ // messages.
+ return Ok(Async::Ready(None));
+ }
+ TryPark::NotEmpty => {
+ // A message has been sent while attempting to
+ // park. Loop again, the next iteration is
+ // guaranteed to get the message.
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ // Drain the channel of all pending messages
+ self.close();
+
+ loop {
+ match self.next_message() {
+ Async::Ready(_) => {}
+ Async::NotReady => {
+ let curr = self.inner.state.load(SeqCst);
+ let state = decode_state(curr);
+
+ // If the channel is closed, then there is no need to park.
+ if state.is_closed() {
+ return;
+ }
+
+ // TODO: Spinning isn't ideal, it might be worth
+ // investigating using a condvar or some other strategy
+ // here. That said, if this case is hit, then another thread
+ // is about to push the value into the queue and this isn't
+ // the only spinlock in the impl right now.
+ thread::yield_now();
+ }
+ }
+ }
+ }
+}
+
+impl<T> UnboundedReceiver<T> {
+ /// Closes the receiving half
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.0.close();
+ }
+}
+
+impl<T> Stream for UnboundedReceiver<T> {
+ type Item = T;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<T>, ()> {
+ self.0.poll()
+ }
+}
+
+/// Handle returned from the `spawn` function.
+///
+/// This handle is a stream that proxies a stream on a separate `Executor`.
+/// Created through the `mpsc::spawn` function, this handle will produce
+/// the same values as the proxied stream, as they are produced in the executor,
+/// and uses a limited buffer to exert back-pressure on the remote stream.
+///
+/// If this handle is dropped, then the stream will no longer be polled and is
+/// scheduled to be dropped.
+pub struct SpawnHandle<Item, Error> {
+ rx: Receiver<Result<Item, Error>>,
+ _cancel_tx: oneshot::Sender<()>,
+}
+
+/// Type of future which `Executor` instances must be able to execute for `spawn`.
+pub struct Execute<S: Stream> {
+ inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>,
+ cancel_rx: oneshot::Receiver<()>,
+}
+
+/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the remote stream.
+///
+/// The `stream` will be canceled if the `SpawnHandle` is dropped.
+///
+/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
+/// When `stream` has additional items available, then the `SpawnHandle`
+/// will have those same items available.
+///
+/// At most `buffer + 1` elements will be buffered at a time. If the buffer
+/// is full, then `stream` will stop progressing until more space is available.
+/// This allows the `SpawnHandle` to exert backpressure on the `stream`.
+///
+/// # Panics
+///
+/// This function will panic if `executor` is unable spawn a `Future` containing
+/// the entirety of the `stream`.
+pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error>
+ where S: Stream,
+ E: Executor<Execute<S>>
+{
+ let (cancel_tx, cancel_rx) = oneshot::channel();
+ let (tx, rx) = channel(buffer);
+ executor.execute(Execute {
+ inner: tx.send_all(resultstream::new(stream)),
+ cancel_rx: cancel_rx,
+ }).expect("failed to spawn stream");
+ SpawnHandle {
+ rx: rx,
+ _cancel_tx: cancel_tx,
+ }
+}
+
+/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the remote stream, with unbounded buffering.
+///
+/// The `stream` will be canceled if the `SpawnHandle` is dropped.
+///
+/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
+/// When `stream` has additional items available, then the `SpawnHandle`
+/// will have those same items available.
+///
+/// An unbounded buffer is used, which means that values will be buffered as
+/// fast as `stream` can produce them, without any backpressure. Therefore, if
+/// `stream` is an infinite stream, it can use an unbounded amount of memory, and
+/// potentially hog CPU resources.
+///
+/// # Panics
+///
+/// This function will panic if `executor` is unable spawn a `Future` containing
+/// the entirety of the `stream`.
+pub fn spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error>
+ where S: Stream,
+ E: Executor<Execute<S>>
+{
+ let (cancel_tx, cancel_rx) = oneshot::channel();
+ let (tx, rx) = channel2(None);
+ executor.execute(Execute {
+ inner: tx.send_all(resultstream::new(stream)),
+ cancel_rx: cancel_rx,
+ }).expect("failed to spawn stream");
+ SpawnHandle {
+ rx: rx,
+ _cancel_tx: cancel_tx,
+ }
+}
+
+impl<I, E> Stream for SpawnHandle<I, E> {
+ type Item = I;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<I>, E> {
+ match self.rx.poll() {
+ Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))),
+ Ok(Async::Ready(Some(Err(e)))) => Err(e),
+ Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Err(_) => unreachable!("mpsc::Receiver should never return Err"),
+ }
+ }
+}
+
+impl<I, E> fmt::Debug for SpawnHandle<I, E> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("SpawnHandle")
+ .finish()
+ }
+}
+
+impl<S: Stream> Future for Execute<S> {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ match self.cancel_rx.poll() {
+ Ok(Async::NotReady) => (),
+ _ => return Ok(Async::Ready(())),
+ }
+ match self.inner.poll() {
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ _ => Ok(Async::Ready(()))
+ }
+ }
+}
+
+impl<S: Stream> fmt::Debug for Execute<S> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Execute")
+ .finish()
+ }
+}
+
+/*
+ *
+ * ===== impl Inner =====
+ *
+ */
+
+impl<T> Inner<T> {
+ // The return value is such that the total number of messages that can be
+ // enqueued into the channel will never exceed MAX_CAPACITY
+ fn max_senders(&self) -> usize {
+ match self.buffer {
+ Some(buffer) => MAX_CAPACITY - buffer,
+ None => MAX_BUFFER,
+ }
+ }
+}
+
+unsafe impl<T: Send> Send for Inner<T> {}
+unsafe impl<T: Send> Sync for Inner<T> {}
+
+impl State {
+ fn is_closed(&self) -> bool {
+ !self.is_open && self.num_messages == 0
+ }
+}
+
+/*
+ *
+ * ===== Helpers =====
+ *
+ */
+
+fn decode_state(num: usize) -> State {
+ State {
+ is_open: num & OPEN_MASK == OPEN_MASK,
+ num_messages: num & MAX_CAPACITY,
+ }
+}
+
+fn encode_state(state: &State) -> usize {
+ let mut num = state.num_messages;
+
+ if state.is_open {
+ num |= OPEN_MASK;
+ }
+
+ num
+}
diff --git a/third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs b/third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs
new file mode 100644
index 0000000000..9ff6bcf873
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs
@@ -0,0 +1,151 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+//! A mostly lock-free multi-producer, single consumer queue.
+//!
+//! This module contains an implementation of a concurrent MPSC queue. This
+//! queue can be used to share data between threads, and is also used as the
+//! building block of channels in rust.
+//!
+//! Note that the current implementation of this queue has a caveat of the `pop`
+//! method, and see the method for more information about it. Due to this
+//! caveat, this queue may not be appropriate for all use-cases.
+
+// http://www.1024cores.net/home/lock-free-algorithms
+// /queues/non-intrusive-mpsc-node-based-queue
+
+// NOTE: this implementation is lifted from the standard library and only
+// slightly modified
+
+pub use self::PopResult::*;
+use std::prelude::v1::*;
+
+use std::cell::UnsafeCell;
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, Ordering};
+
+/// A result of the `pop` function.
+pub enum PopResult<T> {
+ /// Some data has been popped
+ Data(T),
+ /// The queue is empty
+ Empty,
+ /// The queue is in an inconsistent state. Popping data should succeed, but
+ /// some pushers have yet to make enough progress in order allow a pop to
+ /// succeed. It is recommended that a pop() occur "in the near future" in
+ /// order to see if the sender has made progress or not
+ Inconsistent,
+}
+
+#[derive(Debug)]
+struct Node<T> {
+ next: AtomicPtr<Node<T>>,
+ value: Option<T>,
+}
+
+/// The multi-producer single-consumer structure. This is not cloneable, but it
+/// may be safely shared so long as it is guaranteed that there is only one
+/// popper at a time (many pushers are allowed).
+#[derive(Debug)]
+pub struct Queue<T> {
+ head: AtomicPtr<Node<T>>,
+ tail: UnsafeCell<*mut Node<T>>,
+}
+
+unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send> Sync for Queue<T> { }
+
+impl<T> Node<T> {
+ unsafe fn new(v: Option<T>) -> *mut Node<T> {
+ Box::into_raw(Box::new(Node {
+ next: AtomicPtr::new(ptr::null_mut()),
+ value: v,
+ }))
+ }
+}
+
+impl<T> Queue<T> {
+ /// Creates a new queue that is safe to share among multiple producers and
+ /// one consumer.
+ pub fn new() -> Queue<T> {
+ let stub = unsafe { Node::new(None) };
+ Queue {
+ head: AtomicPtr::new(stub),
+ tail: UnsafeCell::new(stub),
+ }
+ }
+
+ /// Pushes a new value onto this queue.
+ pub fn push(&self, t: T) {
+ unsafe {
+ let n = Node::new(Some(t));
+ let prev = self.head.swap(n, Ordering::AcqRel);
+ (*prev).next.store(n, Ordering::Release);
+ }
+ }
+
+ /// Pops some data from this queue.
+ ///
+ /// Note that the current implementation means that this function cannot
+ /// return `Option<T>`. It is possible for this queue to be in an
+ /// inconsistent state where many pushes have succeeded and completely
+ /// finished, but pops cannot return `Some(t)`. This inconsistent state
+ /// happens when a pusher is preempted at an inopportune moment.
+ ///
+ /// This inconsistent state means that this queue does indeed have data, but
+ /// it does not currently have access to it at this time.
+ ///
+ /// This function is unsafe because only one thread can call it at a time.
+ pub unsafe fn pop(&self) -> PopResult<T> {
+ let tail = *self.tail.get();
+ let next = (*tail).next.load(Ordering::Acquire);
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+ assert!((*tail).value.is_none());
+ assert!((*next).value.is_some());
+ let ret = (*next).value.take().unwrap();
+ drop(Box::from_raw(tail));
+ return Data(ret);
+ }
+
+ if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
+ }
+}
+
+impl<T> Drop for Queue<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let mut cur = *self.tail.get();
+ while !cur.is_null() {
+ let next = (*cur).next.load(Ordering::Relaxed);
+ drop(Box::from_raw(cur));
+ cur = next;
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/sync/oneshot.rs b/third_party/rust/futures-0.1.31/src/sync/oneshot.rs
new file mode 100644
index 0000000000..3a9d8efdca
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/sync/oneshot.rs
@@ -0,0 +1,611 @@
+//! A one-shot, futures-aware channel
+
+use std::sync::Arc;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering::SeqCst;
+use std::error::Error;
+use std::fmt;
+
+use {Future, Poll, Async};
+use future::{lazy, Lazy, Executor, IntoFuture};
+use lock::Lock;
+use task::{self, Task};
+
+/// A future representing the completion of a computation happening elsewhere in
+/// memory.
+///
+/// This is created by the `oneshot::channel` function.
+#[must_use = "futures do nothing unless polled"]
+#[derive(Debug)]
+pub struct Receiver<T> {
+ inner: Arc<Inner<T>>,
+}
+
+/// Represents the completion half of a oneshot through which the result of a
+/// computation is signaled.
+///
+/// This is created by the `oneshot::channel` function.
+#[derive(Debug)]
+pub struct Sender<T> {
+ inner: Arc<Inner<T>>,
+}
+
+/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
+/// the internal synchronization between the two for send/recv operations.
+#[derive(Debug)]
+struct Inner<T> {
+ /// Indicates whether this oneshot is complete yet. This is filled in both
+ /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
+ /// appropriately.
+ ///
+ /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
+ /// unlocked and ready to be inspected.
+ ///
+ /// For `Sender` if this is `true` then the oneshot has gone away and it
+ /// can return ready from `poll_cancel`.
+ complete: AtomicBool,
+
+ /// The actual data being transferred as part of this `Receiver`. This is
+ /// filled in by `Sender::complete` and read by `Receiver::poll`.
+ ///
+ /// Note that this is protected by `Lock`, but it is in theory safe to
+ /// replace with an `UnsafeCell` as it's actually protected by `complete`
+ /// above. I wouldn't recommend doing this, however, unless someone is
+ /// supremely confident in the various atomic orderings here and there.
+ data: Lock<Option<T>>,
+
+ /// Field to store the task which is blocked in `Receiver::poll`.
+ ///
+ /// This is filled in when a oneshot is polled but not ready yet. Note that
+ /// the `Lock` here, unlike in `data` above, is important to resolve races.
+ /// Both the `Receiver` and the `Sender` halves understand that if they
+ /// can't acquire the lock then some important interference is happening.
+ rx_task: Lock<Option<Task>>,
+
+ /// Like `rx_task` above, except for the task blocked in
+ /// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
+ tx_task: Lock<Option<Task>>,
+}
+
+/// Creates a new futures-aware, one-shot channel.
+///
+/// This function is similar to Rust's channels found in the standard library.
+/// Two halves are returned, the first of which is a `Sender` handle, used to
+/// signal the end of a computation and provide its value. The second half is a
+/// `Receiver` which implements the `Future` trait, resolving to the value that
+/// was given to the `Sender` handle.
+///
+/// Each half can be separately owned and sent across threads/tasks.
+///
+/// # Examples
+///
+/// ```
+/// use std::thread;
+/// use futures::sync::oneshot;
+/// use futures::*;
+///
+/// let (p, c) = oneshot::channel::<i32>();
+///
+/// thread::spawn(|| {
+/// c.map(|i| {
+/// println!("got: {}", i);
+/// }).wait();
+/// });
+///
+/// p.send(3).unwrap();
+/// ```
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Inner::new());
+ let receiver = Receiver {
+ inner: inner.clone(),
+ };
+ let sender = Sender {
+ inner: inner,
+ };
+ (sender, receiver)
+}
+
+impl<T> Inner<T> {
+ fn new() -> Inner<T> {
+ Inner {
+ complete: AtomicBool::new(false),
+ data: Lock::new(None),
+ rx_task: Lock::new(None),
+ tx_task: Lock::new(None),
+ }
+ }
+
+ fn send(&self, t: T) -> Result<(), T> {
+ if self.complete.load(SeqCst) {
+ return Err(t)
+ }
+
+ // Note that this lock acquisition may fail if the receiver
+ // is closed and sets the `complete` flag to true, whereupon
+ // the receiver may call `poll()`.
+ if let Some(mut slot) = self.data.try_lock() {
+ assert!(slot.is_none());
+ *slot = Some(t);
+ drop(slot);
+
+ // If the receiver called `close()` between the check at the
+ // start of the function, and the lock being released, then
+ // the receiver may not be around to receive it, so try to
+ // pull it back out.
+ if self.complete.load(SeqCst) {
+ // If lock acquisition fails, then receiver is actually
+ // receiving it, so we're good.
+ if let Some(mut slot) = self.data.try_lock() {
+ if let Some(t) = slot.take() {
+ return Err(t);
+ }
+ }
+ }
+ Ok(())
+ } else {
+ // Must have been closed
+ Err(t)
+ }
+ }
+
+ fn poll_cancel(&self) -> Poll<(), ()> {
+ // Fast path up first, just read the flag and see if our other half is
+ // gone. This flag is set both in our destructor and the oneshot
+ // destructor, but our destructor hasn't run yet so if it's set then the
+ // oneshot is gone.
+ if self.complete.load(SeqCst) {
+ return Ok(Async::Ready(()))
+ }
+
+ // If our other half is not gone then we need to park our current task
+ // and move it into the `notify_cancel` slot to get notified when it's
+ // actually gone.
+ //
+ // If `try_lock` fails, then the `Receiver` is in the process of using
+ // it, so we can deduce that it's now in the process of going away and
+ // hence we're canceled. If it succeeds then we just store our handle.
+ //
+ // Crucially we then check `oneshot_gone` *again* before we return.
+ // While we were storing our handle inside `notify_cancel` the `Receiver`
+ // may have been dropped. The first thing it does is set the flag, and
+ // if it fails to acquire the lock it assumes that we'll see the flag
+ // later on. So... we then try to see the flag later on!
+ let handle = task::current();
+ match self.tx_task.try_lock() {
+ Some(mut p) => *p = Some(handle),
+ None => return Ok(Async::Ready(())),
+ }
+ if self.complete.load(SeqCst) {
+ Ok(Async::Ready(()))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+
+ fn is_canceled(&self) -> bool {
+ self.complete.load(SeqCst)
+ }
+
+ fn drop_tx(&self) {
+ // Flag that we're a completed `Sender` and try to wake up a receiver.
+ // Whether or not we actually stored any data will get picked up and
+ // translated to either an item or cancellation.
+ //
+ // Note that if we fail to acquire the `rx_task` lock then that means
+ // we're in one of two situations:
+ //
+ // 1. The receiver is trying to block in `poll`
+ // 2. The receiver is being dropped
+ //
+ // In the first case it'll check the `complete` flag after it's done
+ // blocking to see if it succeeded. In the latter case we don't need to
+ // wake up anyone anyway. So in both cases it's ok to ignore the `None`
+ // case of `try_lock` and bail out.
+ //
+ // The first case crucially depends on `Lock` using `SeqCst` ordering
+ // under the hood. If it instead used `Release` / `Acquire` ordering,
+ // then it would not necessarily synchronize with `inner.complete`
+ // and deadlock might be possible, as was observed in
+ // https://github.com/rust-lang-nursery/futures-rs/pull/219.
+ self.complete.store(true, SeqCst);
+ if let Some(mut slot) = self.rx_task.try_lock() {
+ if let Some(task) = slot.take() {
+ drop(slot);
+ task.notify();
+ }
+ }
+ }
+
+ fn close_rx(&self) {
+ // Flag our completion and then attempt to wake up the sender if it's
+ // blocked. See comments in `drop` below for more info
+ self.complete.store(true, SeqCst);
+ if let Some(mut handle) = self.tx_task.try_lock() {
+ if let Some(task) = handle.take() {
+ drop(handle);
+ task.notify()
+ }
+ }
+ }
+
+ fn try_recv(&self) -> Result<Option<T>, Canceled> {
+ // If we're complete, either `::close_rx` or `::drop_tx` was called.
+ // We can assume a successful send if data is present.
+ if self.complete.load(SeqCst) {
+ if let Some(mut slot) = self.data.try_lock() {
+ if let Some(data) = slot.take() {
+ return Ok(Some(data.into()));
+ }
+ }
+ // Should there be a different error value or a panic in the case
+ // where `self.data.try_lock() == None`?
+ Err(Canceled)
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn recv(&self) -> Poll<T, Canceled> {
+ let mut done = false;
+
+ // Check to see if some data has arrived. If it hasn't then we need to
+ // block our task.
+ //
+ // Note that the acquisition of the `rx_task` lock might fail below, but
+ // the only situation where this can happen is during `Sender::drop`
+ // when we are indeed completed already. If that's happening then we
+ // know we're completed so keep going.
+ if self.complete.load(SeqCst) {
+ done = true;
+ } else {
+ let task = task::current();
+ match self.rx_task.try_lock() {
+ Some(mut slot) => *slot = Some(task),
+ None => done = true,
+ }
+ }
+
+ // If we're `done` via one of the paths above, then look at the data and
+ // figure out what the answer is. If, however, we stored `rx_task`
+ // successfully above we need to check again if we're completed in case
+ // a message was sent while `rx_task` was locked and couldn't notify us
+ // otherwise.
+ //
+ // If we're not done, and we're not complete, though, then we've
+ // successfully blocked our task and we return `NotReady`.
+ if done || self.complete.load(SeqCst) {
+ // If taking the lock fails, the sender will realise that the we're
+ // `done` when it checks the `complete` flag on the way out, and will
+ // treat the send as a failure.
+ if let Some(mut slot) = self.data.try_lock() {
+ if let Some(data) = slot.take() {
+ return Ok(data.into());
+ }
+ }
+ Err(Canceled)
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+
+ fn drop_rx(&self) {
+ // Indicate to the `Sender` that we're done, so any future calls to
+ // `poll_cancel` are weeded out.
+ self.complete.store(true, SeqCst);
+
+ // If we've blocked a task then there's no need for it to stick around,
+ // so we need to drop it. If this lock acquisition fails, though, then
+ // it's just because our `Sender` is trying to take the task, so we
+ // let them take care of that.
+ if let Some(mut slot) = self.rx_task.try_lock() {
+ let task = slot.take();
+ drop(slot);
+ drop(task);
+ }
+
+ // Finally, if our `Sender` wants to get notified of us going away, it
+ // would have stored something in `tx_task`. Here we try to peel that
+ // out and unpark it.
+ //
+ // Note that the `try_lock` here may fail, but only if the `Sender` is
+ // in the process of filling in the task. If that happens then we
+ // already flagged `complete` and they'll pick that up above.
+ if let Some(mut handle) = self.tx_task.try_lock() {
+ if let Some(task) = handle.take() {
+ drop(handle);
+ task.notify()
+ }
+ }
+ }
+}
+
+impl<T> Sender<T> {
+ #[deprecated(note = "renamed to `send`", since = "0.1.11")]
+ #[doc(hidden)]
+ #[cfg(feature = "with-deprecated")]
+ pub fn complete(self, t: T) {
+ drop(self.send(t));
+ }
+
+ /// Completes this oneshot with a successful result.
+ ///
+ /// This function will consume `self` and indicate to the other end, the
+ /// `Receiver`, that the value provided is the result of the computation this
+ /// represents.
+ ///
+ /// If the value is successfully enqueued for the remote end to receive,
+ /// then `Ok(())` is returned. If the receiving end was deallocated before
+ /// this function was called, however, then `Err` is returned with the value
+ /// provided.
+ pub fn send(self, t: T) -> Result<(), T> {
+ self.inner.send(t)
+ }
+
+ /// Polls this `Sender` half to detect whether the `Receiver` this has
+ /// paired with has gone away.
+ ///
+ /// This function can be used to learn about when the `Receiver` (consumer)
+ /// half has gone away and nothing will be able to receive a message sent
+ /// from `send`.
+ ///
+ /// If `Ready` is returned then it means that the `Receiver` has disappeared
+ /// and the result this `Sender` would otherwise produce should no longer
+ /// be produced.
+ ///
+ /// If `NotReady` is returned then the `Receiver` is still alive and may be
+ /// able to receive a message if sent. The current task, however, is
+ /// scheduled to receive a notification if the corresponding `Receiver` goes
+ /// away.
+ ///
+ /// # Panics
+ ///
+ /// Like `Future::poll`, this function will panic if it's not called from
+ /// within the context of a task. In other words, this should only ever be
+ /// called from inside another future.
+ ///
+ /// If `Ok(Ready)` is returned then the associated `Receiver` has been
+ /// dropped, which means any work required for sending should be canceled.
+ ///
+ /// If you're calling this function from a context that does not have a
+ /// task, then you can use the `is_canceled` API instead.
+ pub fn poll_cancel(&mut self) -> Poll<(), ()> {
+ self.inner.poll_cancel()
+ }
+
+ /// Tests to see whether this `Sender`'s corresponding `Receiver`
+ /// has gone away.
+ ///
+ /// This function can be used to learn about when the `Receiver` (consumer)
+ /// half has gone away and nothing will be able to receive a message sent
+ /// from `send`.
+ ///
+ /// Note that this function is intended to *not* be used in the context of a
+ /// future. If you're implementing a future you probably want to call the
+ /// `poll_cancel` function which will block the current task if the
+ /// cancellation hasn't happened yet. This can be useful when working on a
+ /// non-futures related thread, though, which would otherwise panic if
+ /// `poll_cancel` were called.
+ pub fn is_canceled(&self) -> bool {
+ self.inner.is_canceled()
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ self.inner.drop_tx()
+ }
+}
+
+/// Error returned from a `Receiver<T>` whenever the corresponding `Sender<T>`
+/// is dropped.
+#[derive(Clone, Copy, PartialEq, Eq, Debug)]
+pub struct Canceled;
+
+impl fmt::Display for Canceled {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "oneshot canceled")
+ }
+}
+
+impl Error for Canceled {
+ fn description(&self) -> &str {
+ "oneshot canceled"
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Gracefully close this receiver, preventing sending any future messages.
+ ///
+ /// Any `send` operation which happens after this method returns is
+ /// guaranteed to fail. Once this method is called the normal `poll` method
+ /// can be used to determine whether a message was actually sent or not. If
+ /// `Canceled` is returned from `poll` then no message was sent.
+ pub fn close(&mut self) {
+ self.inner.close_rx()
+ }
+
+ /// Attempts to receive a message outside of the context of a task.
+ ///
+ /// Useful when a [`Context`](Context) is not available such as within a
+ /// `Drop` impl.
+ ///
+ /// Does not schedule a task wakeup or have any other side effects.
+ ///
+ /// A return value of `None` must be considered immediately stale (out of
+ /// date) unless [`::close`](Receiver::close) has been called first.
+ ///
+ /// Returns an error if the sender was dropped.
+ pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
+ self.inner.try_recv()
+ }
+}
+
+impl<T> Future for Receiver<T> {
+ type Item = T;
+ type Error = Canceled;
+
+ fn poll(&mut self) -> Poll<T, Canceled> {
+ self.inner.recv()
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ self.inner.drop_rx()
+ }
+}
+
+/// Handle returned from the `spawn` function.
+///
+/// This handle is a future representing the completion of a different future on
+/// a separate executor. Created through the `oneshot::spawn` function this
+/// handle will resolve when the future provided to `spawn` resolves on the
+/// `Executor` instance provided to that function.
+///
+/// If this handle is dropped then the future will automatically no longer be
+/// polled and is scheduled to be dropped. This can be canceled with the
+/// `forget` function, however.
+pub struct SpawnHandle<T, E> {
+ rx: Arc<ExecuteInner<Result<T, E>>>,
+}
+
+struct ExecuteInner<T> {
+ inner: Inner<T>,
+ keep_running: AtomicBool,
+}
+
+/// Type of future which `Execute` instances below must be able to spawn.
+pub struct Execute<F: Future> {
+ future: F,
+ tx: Arc<ExecuteInner<Result<F::Item, F::Error>>>,
+}
+
+/// Spawns a `future` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the completion of the future.
+///
+/// The `SpawnHandle` returned is a future that is a proxy for `future` itself.
+/// When `future` completes on `executor` then the `SpawnHandle` will itself be
+/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is
+/// thus safe to send across threads.
+///
+/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is
+/// not desired then the `SpawnHandle::forget` function can be used to continue
+/// running the future to completion.
+///
+/// # Panics
+///
+/// This function will panic if the instance of `Spawn` provided is unable to
+/// spawn the `future` provided.
+///
+/// If the provided instance of `Spawn` does not actually run `future` to
+/// completion, then the returned handle may panic when polled. Typically this
+/// is not a problem, though, as most instances of `Spawn` will run futures to
+/// completion.
+///
+/// Note that the returned future will likely panic if the `futures` provided
+/// panics. If a future running on an executor panics that typically means that
+/// the executor drops the future, which falls into the above case of not
+/// running the future to completion essentially.
+pub fn spawn<F, E>(future: F, executor: &E) -> SpawnHandle<F::Item, F::Error>
+ where F: Future,
+ E: Executor<Execute<F>>,
+{
+ let data = Arc::new(ExecuteInner {
+ inner: Inner::new(),
+ keep_running: AtomicBool::new(false),
+ });
+ executor.execute(Execute {
+ future: future,
+ tx: data.clone(),
+ }).expect("failed to spawn future");
+ SpawnHandle { rx: data }
+}
+
+/// Spawns a function `f` onto the `Spawn` instance provided `s`.
+///
+/// For more information see the `spawn` function in this module. This function
+/// is just a thin wrapper around `spawn` which will execute the closure on the
+/// executor provided and then complete the future that the closure returns.
+pub fn spawn_fn<F, R, E>(f: F, executor: &E) -> SpawnHandle<R::Item, R::Error>
+ where F: FnOnce() -> R,
+ R: IntoFuture,
+ E: Executor<Execute<Lazy<F, R>>>,
+{
+ spawn(lazy(f), executor)
+}
+
+impl<T, E> SpawnHandle<T, E> {
+ /// Drop this future without canceling the underlying future.
+ ///
+ /// When `SpawnHandle` is dropped, the spawned future will be canceled as
+ /// well if the future hasn't already resolved. This function can be used
+ /// when to drop this future but keep executing the underlying future.
+ pub fn forget(self) {
+ self.rx.keep_running.store(true, SeqCst);
+ }
+}
+
+impl<T, E> Future for SpawnHandle<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<T, E> {
+ match self.rx.inner.recv() {
+ Ok(Async::Ready(Ok(t))) => Ok(t.into()),
+ Ok(Async::Ready(Err(e))) => Err(e),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Err(_) => panic!("future was canceled before completion"),
+ }
+ }
+}
+
+impl<T: fmt::Debug, E: fmt::Debug> fmt::Debug for SpawnHandle<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("SpawnHandle")
+ .finish()
+ }
+}
+
+impl<T, E> Drop for SpawnHandle<T, E> {
+ fn drop(&mut self) {
+ self.rx.inner.drop_rx();
+ }
+}
+
+impl<F: Future> Future for Execute<F> {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ // If we're canceled then we may want to bail out early.
+ //
+ // If the `forget` function was called, though, then we keep going.
+ if self.tx.inner.poll_cancel().unwrap().is_ready() {
+ if !self.tx.keep_running.load(SeqCst) {
+ return Ok(().into())
+ }
+ }
+
+ let result = match self.future.poll() {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Ok(Async::Ready(t)) => Ok(t),
+ Err(e) => Err(e),
+ };
+ drop(self.tx.inner.send(result));
+ Ok(().into())
+ }
+}
+
+impl<F: Future + fmt::Debug> fmt::Debug for Execute<F> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Execute")
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<F: Future> Drop for Execute<F> {
+ fn drop(&mut self) {
+ self.tx.inner.drop_tx();
+ }
+}