summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/unsync
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/unsync
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/unsync')
-rw-r--r--third_party/rust/futures-0.1.31/src/unsync/mod.rs7
-rw-r--r--third_party/rust/futures-0.1.31/src/unsync/mpsc.rs474
-rw-r--r--third_party/rust/futures-0.1.31/src/unsync/oneshot.rs351
3 files changed, 832 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/unsync/mod.rs b/third_party/rust/futures-0.1.31/src/unsync/mod.rs
new file mode 100644
index 0000000000..aaa5a707ba
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/unsync/mod.rs
@@ -0,0 +1,7 @@
+//! Future-aware single-threaded synchronization
+//!
+//! This module contains similar abstractions to `sync`, for communications
+//! between tasks on the same thread only.
+
+pub mod mpsc;
+pub mod oneshot;
diff --git a/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs
new file mode 100644
index 0000000000..ba0d52dc98
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs
@@ -0,0 +1,474 @@
+//! A multi-producer, single-consumer, futures-aware, FIFO queue with back
+//! pressure, for use communicating between tasks on the same thread.
+//!
+//! These queues are the same as those in `futures::sync`, except they're not
+//! intended to be sent across threads.
+
+use std::any::Any;
+use std::cell::RefCell;
+use std::collections::VecDeque;
+use std::error::Error;
+use std::fmt;
+use std::mem;
+use std::rc::{Rc, Weak};
+
+use task::{self, Task};
+use future::Executor;
+use sink::SendAll;
+use resultstream::{self, Results};
+use unsync::oneshot;
+use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
+
+/// Creates a bounded in-memory channel with buffered storage.
+///
+/// This method creates concrete implementations of the `Stream` and `Sink`
+/// traits which can be used to communicate a stream of values between tasks
+/// with backpressure. The channel capacity is exactly `buffer`. On average,
+/// sending a message through this channel performs no dynamic allocation.
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ channel_(Some(buffer))
+}
+
+fn channel_<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
+ let shared = Rc::new(RefCell::new(Shared {
+ buffer: VecDeque::new(),
+ capacity: buffer,
+ blocked_senders: VecDeque::new(),
+ blocked_recv: None,
+ }));
+ let sender = Sender { shared: Rc::downgrade(&shared) };
+ let receiver = Receiver { state: State::Open(shared) };
+ (sender, receiver)
+}
+
+#[derive(Debug)]
+struct Shared<T> {
+ buffer: VecDeque<T>,
+ capacity: Option<usize>,
+ blocked_senders: VecDeque<Task>,
+ blocked_recv: Option<Task>,
+}
+
+/// The transmission end of a channel.
+///
+/// This is created by the `channel` function.
+#[derive(Debug)]
+pub struct Sender<T> {
+ shared: Weak<RefCell<Shared<T>>>,
+}
+
+impl<T> Sender<T> {
+ fn do_send(&self, msg: T) -> StartSend<T, SendError<T>> {
+ let shared = match self.shared.upgrade() {
+ Some(shared) => shared,
+ None => return Err(SendError(msg)), // receiver was dropped
+ };
+ let mut shared = shared.borrow_mut();
+
+ match shared.capacity {
+ Some(capacity) if shared.buffer.len() == capacity => {
+ shared.blocked_senders.push_back(task::current());
+ Ok(AsyncSink::NotReady(msg))
+ }
+ _ => {
+ shared.buffer.push_back(msg);
+ if let Some(task) = shared.blocked_recv.take() {
+ task.notify();
+ }
+ Ok(AsyncSink::Ready)
+ }
+ }
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender { shared: self.shared.clone() }
+ }
+}
+
+impl<T> Sink for Sender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.do_send(msg)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ let shared = match self.shared.upgrade() {
+ Some(shared) => shared,
+ None => return,
+ };
+ // The number of existing `Weak` indicates if we are possibly the last
+ // `Sender`. If we are the last, we possibly must notify a blocked
+ // `Receiver`. `self.shared` is always one of the `Weak` to this shared
+ // data. Therefore the smallest possible Rc::weak_count(&shared) is 1.
+ if Rc::weak_count(&shared) == 1 {
+ if let Some(task) = shared.borrow_mut().blocked_recv.take() {
+ // Wake up receiver as its stream has ended
+ task.notify();
+ }
+ }
+ }
+}
+
+/// The receiving end of a channel which implements the `Stream` trait.
+///
+/// This is created by the `channel` function.
+#[derive(Debug)]
+pub struct Receiver<T> {
+ state: State<T>,
+}
+
+/// Possible states of a receiver. We're either Open (can receive more messages)
+/// or we're closed with a list of messages we have left to receive.
+#[derive(Debug)]
+enum State<T> {
+ Open(Rc<RefCell<Shared<T>>>),
+ Closed(VecDeque<T>),
+}
+
+impl<T> Receiver<T> {
+ /// Closes the receiving half
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ let (blockers, items) = match self.state {
+ State::Open(ref state) => {
+ let mut state = state.borrow_mut();
+ let items = mem::replace(&mut state.buffer, VecDeque::new());
+ let blockers = mem::replace(&mut state.blocked_senders, VecDeque::new());
+ (blockers, items)
+ }
+ State::Closed(_) => return,
+ };
+ self.state = State::Closed(items);
+ for task in blockers {
+ task.notify();
+ }
+ }
+}
+
+impl<T> Stream for Receiver<T> {
+ type Item = T;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ let me = match self.state {
+ State::Open(ref mut me) => me,
+ State::Closed(ref mut items) => {
+ return Ok(Async::Ready(items.pop_front()))
+ }
+ };
+
+ if let Some(shared) = Rc::get_mut(me) {
+ // All senders have been dropped, so drain the buffer and end the
+ // stream.
+ return Ok(Async::Ready(shared.borrow_mut().buffer.pop_front()));
+ }
+
+ let mut shared = me.borrow_mut();
+ if let Some(msg) = shared.buffer.pop_front() {
+ if let Some(task) = shared.blocked_senders.pop_front() {
+ drop(shared);
+ task.notify();
+ }
+ Ok(Async::Ready(Some(msg)))
+ } else {
+ shared.blocked_recv = Some(task::current());
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ self.close();
+ }
+}
+
+/// The transmission end of an unbounded channel.
+///
+/// This is created by the `unbounded` function.
+#[derive(Debug)]
+pub struct UnboundedSender<T>(Sender<T>);
+
+impl<T> Clone for UnboundedSender<T> {
+ fn clone(&self) -> Self {
+ UnboundedSender(self.0.clone())
+ }
+}
+
+impl<T> Sink for UnboundedSender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.0.start_send(msg)
+ }
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<'a, T> Sink for &'a UnboundedSender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.0.do_send(msg)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ #[deprecated(note = "renamed to `unbounded_send`")]
+ #[doc(hidden)]
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ self.unbounded_send(msg)
+ }
+
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
+ let shared = match self.0.shared.upgrade() {
+ Some(shared) => shared,
+ None => return Err(SendError(msg)),
+ };
+ let mut shared = shared.borrow_mut();
+ shared.buffer.push_back(msg);
+ if let Some(task) = shared.blocked_recv.take() {
+ drop(shared);
+ task.notify();
+ }
+ Ok(())
+ }
+}
+
+/// The receiving end of an unbounded channel.
+///
+/// This is created by the `unbounded` function.
+#[derive(Debug)]
+pub struct UnboundedReceiver<T>(Receiver<T>);
+
+impl<T> UnboundedReceiver<T> {
+ /// Closes the receiving half
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.0.close();
+ }
+}
+
+impl<T> Stream for UnboundedReceiver<T> {
+ type Item = T;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.0.poll()
+ }
+}
+
+/// Creates an unbounded in-memory channel with buffered storage.
+///
+/// Identical semantics to `channel`, except with no limit to buffer size.
+pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+ let (send, recv) = channel_(None);
+ (UnboundedSender(send), UnboundedReceiver(recv))
+}
+
+/// Error type for sending, used when the receiving end of a channel is
+/// dropped
+pub struct SendError<T>(T);
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("SendError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "send failed because receiver is gone")
+ }
+}
+
+impl<T: Any> Error for SendError<T> {
+ fn description(&self) -> &str {
+ "send failed because receiver is gone"
+ }
+}
+
+impl<T> SendError<T> {
+ /// Returns the message that was attempted to be sent but failed.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+/// Handle returned from the `spawn` function.
+///
+/// This handle is a stream that proxies a stream on a separate `Executor`.
+/// Created through the `mpsc::spawn` function, this handle will produce
+/// the same values as the proxied stream, as they are produced in the executor,
+/// and uses a limited buffer to exert back-pressure on the remote stream.
+///
+/// If this handle is dropped, then the stream will no longer be polled and is
+/// scheduled to be dropped.
+pub struct SpawnHandle<Item, Error> {
+ inner: Receiver<Result<Item, Error>>,
+ _cancel_tx: oneshot::Sender<()>,
+}
+
+/// Type of future which `Executor` instances must be able to execute for `spawn`.
+pub struct Execute<S: Stream> {
+ inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>,
+ cancel_rx: oneshot::Receiver<()>,
+}
+
+/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the remote stream.
+///
+/// The `stream` will be canceled if the `SpawnHandle` is dropped.
+///
+/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
+/// When `stream` has additional items available, then the `SpawnHandle`
+/// will have those same items available.
+///
+/// At most `buffer + 1` elements will be buffered at a time. If the buffer
+/// is full, then `stream` will stop progressing until more space is available.
+/// This allows the `SpawnHandle` to exert backpressure on the `stream`.
+///
+/// # Panics
+///
+/// This function will panic if `executor` is unable spawn a `Future` containing
+/// the entirety of the `stream`.
+pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error>
+ where S: Stream,
+ E: Executor<Execute<S>>
+{
+ let (cancel_tx, cancel_rx) = oneshot::channel();
+ let (tx, rx) = channel(buffer);
+ executor.execute(Execute {
+ inner: tx.send_all(resultstream::new(stream)),
+ cancel_rx: cancel_rx,
+ }).expect("failed to spawn stream");
+ SpawnHandle {
+ inner: rx,
+ _cancel_tx: cancel_tx,
+ }
+}
+
+/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the remote stream, with unbounded buffering.
+///
+/// The `stream` will be canceled if the `SpawnHandle` is dropped.
+///
+/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
+/// When `stream` has additional items available, then the `SpawnHandle`
+/// will have those same items available.
+///
+/// An unbounded buffer is used, which means that values will be buffered as
+/// fast as `stream` can produce them, without any backpressure. Therefore, if
+/// `stream` is an infinite stream, it can use an unbounded amount of memory, and
+/// potentially hog CPU resources. In particular, if `stream` is infinite
+/// and doesn't ever yield (by returning `Async::NotReady` from `poll`), it
+/// will result in an infinite loop.
+///
+/// # Panics
+///
+/// This function will panic if `executor` is unable spawn a `Future` containing
+/// the entirety of the `stream`.
+pub fn spawn_unbounded<S,E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error>
+ where S: Stream,
+ E: Executor<Execute<S>>
+{
+ let (cancel_tx, cancel_rx) = oneshot::channel();
+ let (tx, rx) = channel_(None);
+ executor.execute(Execute {
+ inner: tx.send_all(resultstream::new(stream)),
+ cancel_rx: cancel_rx,
+ }).expect("failed to spawn stream");
+ SpawnHandle {
+ inner: rx,
+ _cancel_tx: cancel_tx,
+ }
+}
+
+impl<I, E> Stream for SpawnHandle<I, E> {
+ type Item = I;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<I>, E> {
+ match self.inner.poll() {
+ Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))),
+ Ok(Async::Ready(Some(Err(e)))) => Err(e),
+ Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Err(_) => unreachable!("mpsc::Receiver should never return Err"),
+ }
+ }
+}
+
+impl<I, E> fmt::Debug for SpawnHandle<I, E> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("SpawnHandle")
+ .finish()
+ }
+}
+
+impl<S: Stream> Future for Execute<S> {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ match self.cancel_rx.poll() {
+ Ok(Async::NotReady) => (),
+ _ => return Ok(Async::Ready(())),
+ }
+ match self.inner.poll() {
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ _ => Ok(Async::Ready(()))
+ }
+ }
+}
+
+impl<S: Stream> fmt::Debug for Execute<S> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Execute")
+ .finish()
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/src/unsync/oneshot.rs b/third_party/rust/futures-0.1.31/src/unsync/oneshot.rs
new file mode 100644
index 0000000000..7ae2890f9e
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/unsync/oneshot.rs
@@ -0,0 +1,351 @@
+//! A one-shot, futures-aware channel
+//!
+//! This channel is similar to that in `sync::oneshot` but cannot be sent across
+//! threads.
+
+use std::cell::{Cell, RefCell};
+use std::fmt;
+use std::rc::{Rc, Weak};
+
+use {Future, Poll, Async};
+use future::{Executor, IntoFuture, Lazy, lazy};
+use task::{self, Task};
+
+/// Creates a new futures-aware, one-shot channel.
+///
+/// This function is the same as `sync::oneshot::channel` except that the
+/// returned values cannot be sent across threads.
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let inner = Rc::new(RefCell::new(Inner {
+ value: None,
+ tx_task: None,
+ rx_task: None,
+ }));
+ let tx = Sender {
+ inner: Rc::downgrade(&inner),
+ };
+ let rx = Receiver {
+ state: State::Open(inner),
+ };
+ (tx, rx)
+}
+
+/// Represents the completion half of a oneshot through which the result of a
+/// computation is signaled.
+///
+/// This is created by the `unsync::oneshot::channel` function and is equivalent
+/// in functionality to `sync::oneshot::Sender` except that it cannot be sent
+/// across threads.
+#[derive(Debug)]
+pub struct Sender<T> {
+ inner: Weak<RefCell<Inner<T>>>,
+}
+
+/// A future representing the completion of a computation happening elsewhere in
+/// memory.
+///
+/// This is created by the `unsync::oneshot::channel` function and is equivalent
+/// in functionality to `sync::oneshot::Receiver` except that it cannot be sent
+/// across threads.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct Receiver<T> {
+ state: State<T>,
+}
+
+#[derive(Debug)]
+enum State<T> {
+ Open(Rc<RefCell<Inner<T>>>),
+ Closed(Option<T>),
+}
+
+pub use sync::oneshot::Canceled;
+
+#[derive(Debug)]
+struct Inner<T> {
+ value: Option<T>,
+ tx_task: Option<Task>,
+ rx_task: Option<Task>,
+}
+
+impl<T> Sender<T> {
+ /// Completes this oneshot with a successful result.
+ ///
+ /// This function will consume `self` and indicate to the other end, the
+ /// `Receiver`, that the error provided is the result of the computation this
+ /// represents.
+ ///
+ /// If the value is successfully enqueued for the remote end to receive,
+ /// then `Ok(())` is returned. If the receiving end was deallocated before
+ /// this function was called, however, then `Err` is returned with the value
+ /// provided.
+ pub fn send(self, val: T) -> Result<(), T> {
+ if let Some(inner) = self.inner.upgrade() {
+ inner.borrow_mut().value = Some(val);
+ Ok(())
+ } else {
+ Err(val)
+ }
+ }
+
+ /// Polls this `Sender` half to detect whether the `Receiver` this has
+ /// paired with has gone away.
+ ///
+ /// This function can be used to learn about when the `Receiver` (consumer)
+ /// half has gone away and nothing will be able to receive a message sent
+ /// from `complete`.
+ ///
+ /// Like `Future::poll`, this function will panic if it's not called from
+ /// within the context of a task. In other words, this should only ever be
+ /// called from inside another future.
+ ///
+ /// If `Ready` is returned then it means that the `Receiver` has disappeared
+ /// and the result this `Sender` would otherwise produce should no longer
+ /// be produced.
+ ///
+ /// If `NotReady` is returned then the `Receiver` is still alive and may be
+ /// able to receive a message if sent. The current task, however, is
+ /// scheduled to receive a notification if the corresponding `Receiver` goes
+ /// away.
+ pub fn poll_cancel(&mut self) -> Poll<(), ()> {
+ match self.inner.upgrade() {
+ Some(inner) => {
+ inner.borrow_mut().tx_task = Some(task::current());
+ Ok(Async::NotReady)
+ }
+ None => Ok(().into()),
+ }
+ }
+
+ /// Tests to see whether this `Sender`'s corresponding `Receiver`
+ /// has gone away.
+ ///
+ /// This function can be used to learn about when the `Receiver` (consumer)
+ /// half has gone away and nothing will be able to receive a message sent
+ /// from `send`.
+ ///
+ /// Note that this function is intended to *not* be used in the context of a
+ /// future. If you're implementing a future you probably want to call the
+ /// `poll_cancel` function which will block the current task if the
+ /// cancellation hasn't happened yet. This can be useful when working on a
+ /// non-futures related thread, though, which would otherwise panic if
+ /// `poll_cancel` were called.
+ pub fn is_canceled(&self) -> bool {
+ !self.inner.upgrade().is_some()
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ let inner = match self.inner.upgrade() {
+ Some(inner) => inner,
+ None => return,
+ };
+ let rx_task = {
+ let mut borrow = inner.borrow_mut();
+ borrow.tx_task.take();
+ borrow.rx_task.take()
+ };
+ if let Some(task) = rx_task {
+ task.notify();
+ }
+ }
+}
+
+impl<T> Receiver<T> {
+ /// Gracefully close this receiver, preventing sending any future messages.
+ ///
+ /// Any `send` operation which happens after this method returns is
+ /// guaranteed to fail. Once this method is called the normal `poll` method
+ /// can be used to determine whether a message was actually sent or not. If
+ /// `Canceled` is returned from `poll` then no message was sent.
+ pub fn close(&mut self) {
+ let (item, task) = match self.state {
+ State::Open(ref inner) => {
+ let mut inner = inner.borrow_mut();
+ drop(inner.rx_task.take());
+ (inner.value.take(), inner.tx_task.take())
+ }
+ State::Closed(_) => return,
+ };
+ self.state = State::Closed(item);
+ if let Some(task) = task {
+ task.notify();
+ }
+ }
+}
+
+impl<T> Future for Receiver<T> {
+ type Item = T;
+ type Error = Canceled;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let inner = match self.state {
+ State::Open(ref mut inner) => inner,
+ State::Closed(ref mut item) => {
+ match item.take() {
+ Some(item) => return Ok(item.into()),
+ None => return Err(Canceled),
+ }
+ }
+ };
+
+ // If we've got a value, then skip the logic below as we're done.
+ if let Some(val) = inner.borrow_mut().value.take() {
+ return Ok(Async::Ready(val))
+ }
+
+ // If we can get mutable access, then the sender has gone away. We
+ // didn't see a value above, so we're canceled. Otherwise we park
+ // our task and wait for a value to come in.
+ if Rc::get_mut(inner).is_some() {
+ Err(Canceled)
+ } else {
+ inner.borrow_mut().rx_task = Some(task::current());
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ self.close();
+ }
+}
+
+/// Handle returned from the `spawn` function.
+///
+/// This handle is a future representing the completion of a different future on
+/// a separate executor. Created through the `oneshot::spawn` function this
+/// handle will resolve when the future provided to `spawn` resolves on the
+/// `Executor` instance provided to that function.
+///
+/// If this handle is dropped then the future will automatically no longer be
+/// polled and is scheduled to be dropped. This can be canceled with the
+/// `forget` function, however.
+pub struct SpawnHandle<T, E> {
+ rx: Receiver<Result<T, E>>,
+ keep_running: Rc<Cell<bool>>,
+}
+
+/// Type of future which `Spawn` instances below must be able to spawn.
+pub struct Execute<F: Future> {
+ future: F,
+ tx: Option<Sender<Result<F::Item, F::Error>>>,
+ keep_running: Rc<Cell<bool>>,
+}
+
+/// Spawns a `future` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the completion of the future.
+///
+/// The `SpawnHandle` returned is a future that is a proxy for `future` itself.
+/// When `future` completes on `executor` then the `SpawnHandle` will itself be
+/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is
+/// thus not safe to send across threads.
+///
+/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is
+/// not desired then the `SpawnHandle::forget` function can be used to continue
+/// running the future to completion.
+///
+/// # Panics
+///
+/// This function will panic if the instance of `Spawn` provided is unable to
+/// spawn the `future` provided.
+///
+/// If the provided instance of `Spawn` does not actually run `future` to
+/// completion, then the returned handle may panic when polled. Typically this
+/// is not a problem, though, as most instances of `Spawn` will run futures to
+/// completion.
+pub fn spawn<F, E>(future: F, executor: &E) -> SpawnHandle<F::Item, F::Error>
+ where F: Future,
+ E: Executor<Execute<F>>,
+{
+ let flag = Rc::new(Cell::new(false));
+ let (tx, rx) = channel();
+ executor.execute(Execute {
+ future: future,
+ tx: Some(tx),
+ keep_running: flag.clone(),
+ }).expect("failed to spawn future");
+ SpawnHandle {
+ rx: rx,
+ keep_running: flag,
+ }
+}
+
+/// Spawns a function `f` onto the `Spawn` instance provided `s`.
+///
+/// For more information see the `spawn` function in this module. This function
+/// is just a thin wrapper around `spawn` which will execute the closure on the
+/// executor provided and then complete the future that the closure returns.
+pub fn spawn_fn<F, R, E>(f: F, executor: &E) -> SpawnHandle<R::Item, R::Error>
+ where F: FnOnce() -> R,
+ R: IntoFuture,
+ E: Executor<Execute<Lazy<F, R>>>,
+{
+ spawn(lazy(f), executor)
+}
+
+impl<T, E> SpawnHandle<T, E> {
+ /// Drop this future without canceling the underlying future.
+ ///
+ /// When `SpawnHandle` is dropped, the spawned future will be canceled as
+ /// well if the future hasn't already resolved. This function can be used
+ /// when to drop this future but keep executing the underlying future.
+ pub fn forget(self) {
+ self.keep_running.set(true);
+ }
+}
+
+impl<T, E> Future for SpawnHandle<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<T, E> {
+ match self.rx.poll() {
+ Ok(Async::Ready(Ok(t))) => Ok(t.into()),
+ Ok(Async::Ready(Err(e))) => Err(e),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Err(_) => panic!("future was canceled before completion"),
+ }
+ }
+}
+
+impl<T: fmt::Debug, E: fmt::Debug> fmt::Debug for SpawnHandle<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("SpawnHandle")
+ .finish()
+ }
+}
+
+impl<F: Future> Future for Execute<F> {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ // If we're canceled then we may want to bail out early.
+ //
+ // If the `forget` function was called, though, then we keep going.
+ if self.tx.as_mut().unwrap().poll_cancel().unwrap().is_ready() {
+ if !self.keep_running.get() {
+ return Ok(().into())
+ }
+ }
+
+ let result = match self.future.poll() {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Ok(Async::Ready(t)) => Ok(t),
+ Err(e) => Err(e),
+ };
+ drop(self.tx.take().unwrap().send(result));
+ Ok(().into())
+ }
+}
+
+impl<F: Future + fmt::Debug> fmt::Debug for Execute<F> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Execute")
+ .field("future", &self.future)
+ .finish()
+ }
+}