diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/unsync | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/unsync')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/unsync/mod.rs | 7 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/src/unsync/mpsc.rs | 474 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/src/unsync/oneshot.rs | 351 |
3 files changed, 832 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/unsync/mod.rs b/third_party/rust/futures-0.1.31/src/unsync/mod.rs new file mode 100644 index 0000000000..aaa5a707ba --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/unsync/mod.rs @@ -0,0 +1,7 @@ +//! Future-aware single-threaded synchronization +//! +//! This module contains similar abstractions to `sync`, for communications +//! between tasks on the same thread only. + +pub mod mpsc; +pub mod oneshot; diff --git a/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs new file mode 100644 index 0000000000..ba0d52dc98 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs @@ -0,0 +1,474 @@ +//! A multi-producer, single-consumer, futures-aware, FIFO queue with back +//! pressure, for use communicating between tasks on the same thread. +//! +//! These queues are the same as those in `futures::sync`, except they're not +//! intended to be sent across threads. + +use std::any::Any; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::error::Error; +use std::fmt; +use std::mem; +use std::rc::{Rc, Weak}; + +use task::{self, Task}; +use future::Executor; +use sink::SendAll; +use resultstream::{self, Results}; +use unsync::oneshot; +use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream}; + +/// Creates a bounded in-memory channel with buffered storage. +/// +/// This method creates concrete implementations of the `Stream` and `Sink` +/// traits which can be used to communicate a stream of values between tasks +/// with backpressure. The channel capacity is exactly `buffer`. On average, +/// sending a message through this channel performs no dynamic allocation. +pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { + channel_(Some(buffer)) +} + +fn channel_<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) { + let shared = Rc::new(RefCell::new(Shared { + buffer: VecDeque::new(), + capacity: buffer, + blocked_senders: VecDeque::new(), + blocked_recv: None, + })); + let sender = Sender { shared: Rc::downgrade(&shared) }; + let receiver = Receiver { state: State::Open(shared) }; + (sender, receiver) +} + +#[derive(Debug)] +struct Shared<T> { + buffer: VecDeque<T>, + capacity: Option<usize>, + blocked_senders: VecDeque<Task>, + blocked_recv: Option<Task>, +} + +/// The transmission end of a channel. +/// +/// This is created by the `channel` function. +#[derive(Debug)] +pub struct Sender<T> { + shared: Weak<RefCell<Shared<T>>>, +} + +impl<T> Sender<T> { + fn do_send(&self, msg: T) -> StartSend<T, SendError<T>> { + let shared = match self.shared.upgrade() { + Some(shared) => shared, + None => return Err(SendError(msg)), // receiver was dropped + }; + let mut shared = shared.borrow_mut(); + + match shared.capacity { + Some(capacity) if shared.buffer.len() == capacity => { + shared.blocked_senders.push_back(task::current()); + Ok(AsyncSink::NotReady(msg)) + } + _ => { + shared.buffer.push_back(msg); + if let Some(task) = shared.blocked_recv.take() { + task.notify(); + } + Ok(AsyncSink::Ready) + } + } + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Sender { shared: self.shared.clone() } + } +} + +impl<T> Sink for Sender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.do_send(msg) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + let shared = match self.shared.upgrade() { + Some(shared) => shared, + None => return, + }; + // The number of existing `Weak` indicates if we are possibly the last + // `Sender`. If we are the last, we possibly must notify a blocked + // `Receiver`. `self.shared` is always one of the `Weak` to this shared + // data. Therefore the smallest possible Rc::weak_count(&shared) is 1. + if Rc::weak_count(&shared) == 1 { + if let Some(task) = shared.borrow_mut().blocked_recv.take() { + // Wake up receiver as its stream has ended + task.notify(); + } + } + } +} + +/// The receiving end of a channel which implements the `Stream` trait. +/// +/// This is created by the `channel` function. +#[derive(Debug)] +pub struct Receiver<T> { + state: State<T>, +} + +/// Possible states of a receiver. We're either Open (can receive more messages) +/// or we're closed with a list of messages we have left to receive. +#[derive(Debug)] +enum State<T> { + Open(Rc<RefCell<Shared<T>>>), + Closed(VecDeque<T>), +} + +impl<T> Receiver<T> { + /// Closes the receiving half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + let (blockers, items) = match self.state { + State::Open(ref state) => { + let mut state = state.borrow_mut(); + let items = mem::replace(&mut state.buffer, VecDeque::new()); + let blockers = mem::replace(&mut state.blocked_senders, VecDeque::new()); + (blockers, items) + } + State::Closed(_) => return, + }; + self.state = State::Closed(items); + for task in blockers { + task.notify(); + } + } +} + +impl<T> Stream for Receiver<T> { + type Item = T; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let me = match self.state { + State::Open(ref mut me) => me, + State::Closed(ref mut items) => { + return Ok(Async::Ready(items.pop_front())) + } + }; + + if let Some(shared) = Rc::get_mut(me) { + // All senders have been dropped, so drain the buffer and end the + // stream. + return Ok(Async::Ready(shared.borrow_mut().buffer.pop_front())); + } + + let mut shared = me.borrow_mut(); + if let Some(msg) = shared.buffer.pop_front() { + if let Some(task) = shared.blocked_senders.pop_front() { + drop(shared); + task.notify(); + } + Ok(Async::Ready(Some(msg))) + } else { + shared.blocked_recv = Some(task::current()); + Ok(Async::NotReady) + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + self.close(); + } +} + +/// The transmission end of an unbounded channel. +/// +/// This is created by the `unbounded` function. +#[derive(Debug)] +pub struct UnboundedSender<T>(Sender<T>); + +impl<T> Clone for UnboundedSender<T> { + fn clone(&self) -> Self { + UnboundedSender(self.0.clone()) + } +} + +impl<T> Sink for UnboundedSender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.0.start_send(msg) + } + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<'a, T> Sink for &'a UnboundedSender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.0.do_send(msg) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<T> UnboundedSender<T> { + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + #[deprecated(note = "renamed to `unbounded_send`")] + #[doc(hidden)] + pub fn send(&self, msg: T) -> Result<(), SendError<T>> { + self.unbounded_send(msg) + } + + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> { + let shared = match self.0.shared.upgrade() { + Some(shared) => shared, + None => return Err(SendError(msg)), + }; + let mut shared = shared.borrow_mut(); + shared.buffer.push_back(msg); + if let Some(task) = shared.blocked_recv.take() { + drop(shared); + task.notify(); + } + Ok(()) + } +} + +/// The receiving end of an unbounded channel. +/// +/// This is created by the `unbounded` function. +#[derive(Debug)] +pub struct UnboundedReceiver<T>(Receiver<T>); + +impl<T> UnboundedReceiver<T> { + /// Closes the receiving half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.0.close(); + } +} + +impl<T> Stream for UnboundedReceiver<T> { + type Item = T; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.0.poll() + } +} + +/// Creates an unbounded in-memory channel with buffered storage. +/// +/// Identical semantics to `channel`, except with no limit to buffer size. +pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + let (send, recv) = channel_(None); + (UnboundedSender(send), UnboundedReceiver(recv)) +} + +/// Error type for sending, used when the receiving end of a channel is +/// dropped +pub struct SendError<T>(T); + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("SendError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "send failed because receiver is gone") + } +} + +impl<T: Any> Error for SendError<T> { + fn description(&self) -> &str { + "send failed because receiver is gone" + } +} + +impl<T> SendError<T> { + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.0 + } +} + +/// Handle returned from the `spawn` function. +/// +/// This handle is a stream that proxies a stream on a separate `Executor`. +/// Created through the `mpsc::spawn` function, this handle will produce +/// the same values as the proxied stream, as they are produced in the executor, +/// and uses a limited buffer to exert back-pressure on the remote stream. +/// +/// If this handle is dropped, then the stream will no longer be polled and is +/// scheduled to be dropped. +pub struct SpawnHandle<Item, Error> { + inner: Receiver<Result<Item, Error>>, + _cancel_tx: oneshot::Sender<()>, +} + +/// Type of future which `Executor` instances must be able to execute for `spawn`. +pub struct Execute<S: Stream> { + inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>, + cancel_rx: oneshot::Receiver<()>, +} + +/// Spawns a `stream` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the remote stream. +/// +/// The `stream` will be canceled if the `SpawnHandle` is dropped. +/// +/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. +/// When `stream` has additional items available, then the `SpawnHandle` +/// will have those same items available. +/// +/// At most `buffer + 1` elements will be buffered at a time. If the buffer +/// is full, then `stream` will stop progressing until more space is available. +/// This allows the `SpawnHandle` to exert backpressure on the `stream`. +/// +/// # Panics +/// +/// This function will panic if `executor` is unable spawn a `Future` containing +/// the entirety of the `stream`. +pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error> + where S: Stream, + E: Executor<Execute<S>> +{ + let (cancel_tx, cancel_rx) = oneshot::channel(); + let (tx, rx) = channel(buffer); + executor.execute(Execute { + inner: tx.send_all(resultstream::new(stream)), + cancel_rx: cancel_rx, + }).expect("failed to spawn stream"); + SpawnHandle { + inner: rx, + _cancel_tx: cancel_tx, + } +} + +/// Spawns a `stream` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the remote stream, with unbounded buffering. +/// +/// The `stream` will be canceled if the `SpawnHandle` is dropped. +/// +/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. +/// When `stream` has additional items available, then the `SpawnHandle` +/// will have those same items available. +/// +/// An unbounded buffer is used, which means that values will be buffered as +/// fast as `stream` can produce them, without any backpressure. Therefore, if +/// `stream` is an infinite stream, it can use an unbounded amount of memory, and +/// potentially hog CPU resources. In particular, if `stream` is infinite +/// and doesn't ever yield (by returning `Async::NotReady` from `poll`), it +/// will result in an infinite loop. +/// +/// # Panics +/// +/// This function will panic if `executor` is unable spawn a `Future` containing +/// the entirety of the `stream`. +pub fn spawn_unbounded<S,E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error> + where S: Stream, + E: Executor<Execute<S>> +{ + let (cancel_tx, cancel_rx) = oneshot::channel(); + let (tx, rx) = channel_(None); + executor.execute(Execute { + inner: tx.send_all(resultstream::new(stream)), + cancel_rx: cancel_rx, + }).expect("failed to spawn stream"); + SpawnHandle { + inner: rx, + _cancel_tx: cancel_tx, + } +} + +impl<I, E> Stream for SpawnHandle<I, E> { + type Item = I; + type Error = E; + + fn poll(&mut self) -> Poll<Option<I>, E> { + match self.inner.poll() { + Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))), + Ok(Async::Ready(Some(Err(e)))) => Err(e), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => unreachable!("mpsc::Receiver should never return Err"), + } + } +} + +impl<I, E> fmt::Debug for SpawnHandle<I, E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl<S: Stream> Future for Execute<S> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + match self.cancel_rx.poll() { + Ok(Async::NotReady) => (), + _ => return Ok(Async::Ready(())), + } + match self.inner.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + _ => Ok(Async::Ready(())) + } + } +} + +impl<S: Stream> fmt::Debug for Execute<S> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .finish() + } +} diff --git a/third_party/rust/futures-0.1.31/src/unsync/oneshot.rs b/third_party/rust/futures-0.1.31/src/unsync/oneshot.rs new file mode 100644 index 0000000000..7ae2890f9e --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/unsync/oneshot.rs @@ -0,0 +1,351 @@ +//! A one-shot, futures-aware channel +//! +//! This channel is similar to that in `sync::oneshot` but cannot be sent across +//! threads. + +use std::cell::{Cell, RefCell}; +use std::fmt; +use std::rc::{Rc, Weak}; + +use {Future, Poll, Async}; +use future::{Executor, IntoFuture, Lazy, lazy}; +use task::{self, Task}; + +/// Creates a new futures-aware, one-shot channel. +/// +/// This function is the same as `sync::oneshot::channel` except that the +/// returned values cannot be sent across threads. +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let inner = Rc::new(RefCell::new(Inner { + value: None, + tx_task: None, + rx_task: None, + })); + let tx = Sender { + inner: Rc::downgrade(&inner), + }; + let rx = Receiver { + state: State::Open(inner), + }; + (tx, rx) +} + +/// Represents the completion half of a oneshot through which the result of a +/// computation is signaled. +/// +/// This is created by the `unsync::oneshot::channel` function and is equivalent +/// in functionality to `sync::oneshot::Sender` except that it cannot be sent +/// across threads. +#[derive(Debug)] +pub struct Sender<T> { + inner: Weak<RefCell<Inner<T>>>, +} + +/// A future representing the completion of a computation happening elsewhere in +/// memory. +/// +/// This is created by the `unsync::oneshot::channel` function and is equivalent +/// in functionality to `sync::oneshot::Receiver` except that it cannot be sent +/// across threads. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Receiver<T> { + state: State<T>, +} + +#[derive(Debug)] +enum State<T> { + Open(Rc<RefCell<Inner<T>>>), + Closed(Option<T>), +} + +pub use sync::oneshot::Canceled; + +#[derive(Debug)] +struct Inner<T> { + value: Option<T>, + tx_task: Option<Task>, + rx_task: Option<Task>, +} + +impl<T> Sender<T> { + /// Completes this oneshot with a successful result. + /// + /// This function will consume `self` and indicate to the other end, the + /// `Receiver`, that the error provided is the result of the computation this + /// represents. + /// + /// If the value is successfully enqueued for the remote end to receive, + /// then `Ok(())` is returned. If the receiving end was deallocated before + /// this function was called, however, then `Err` is returned with the value + /// provided. + pub fn send(self, val: T) -> Result<(), T> { + if let Some(inner) = self.inner.upgrade() { + inner.borrow_mut().value = Some(val); + Ok(()) + } else { + Err(val) + } + } + + /// Polls this `Sender` half to detect whether the `Receiver` this has + /// paired with has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `complete`. + /// + /// Like `Future::poll`, this function will panic if it's not called from + /// within the context of a task. In other words, this should only ever be + /// called from inside another future. + /// + /// If `Ready` is returned then it means that the `Receiver` has disappeared + /// and the result this `Sender` would otherwise produce should no longer + /// be produced. + /// + /// If `NotReady` is returned then the `Receiver` is still alive and may be + /// able to receive a message if sent. The current task, however, is + /// scheduled to receive a notification if the corresponding `Receiver` goes + /// away. + pub fn poll_cancel(&mut self) -> Poll<(), ()> { + match self.inner.upgrade() { + Some(inner) => { + inner.borrow_mut().tx_task = Some(task::current()); + Ok(Async::NotReady) + } + None => Ok(().into()), + } + } + + /// Tests to see whether this `Sender`'s corresponding `Receiver` + /// has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `send`. + /// + /// Note that this function is intended to *not* be used in the context of a + /// future. If you're implementing a future you probably want to call the + /// `poll_cancel` function which will block the current task if the + /// cancellation hasn't happened yet. This can be useful when working on a + /// non-futures related thread, though, which would otherwise panic if + /// `poll_cancel` were called. + pub fn is_canceled(&self) -> bool { + !self.inner.upgrade().is_some() + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + let inner = match self.inner.upgrade() { + Some(inner) => inner, + None => return, + }; + let rx_task = { + let mut borrow = inner.borrow_mut(); + borrow.tx_task.take(); + borrow.rx_task.take() + }; + if let Some(task) = rx_task { + task.notify(); + } + } +} + +impl<T> Receiver<T> { + /// Gracefully close this receiver, preventing sending any future messages. + /// + /// Any `send` operation which happens after this method returns is + /// guaranteed to fail. Once this method is called the normal `poll` method + /// can be used to determine whether a message was actually sent or not. If + /// `Canceled` is returned from `poll` then no message was sent. + pub fn close(&mut self) { + let (item, task) = match self.state { + State::Open(ref inner) => { + let mut inner = inner.borrow_mut(); + drop(inner.rx_task.take()); + (inner.value.take(), inner.tx_task.take()) + } + State::Closed(_) => return, + }; + self.state = State::Closed(item); + if let Some(task) = task { + task.notify(); + } + } +} + +impl<T> Future for Receiver<T> { + type Item = T; + type Error = Canceled; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let inner = match self.state { + State::Open(ref mut inner) => inner, + State::Closed(ref mut item) => { + match item.take() { + Some(item) => return Ok(item.into()), + None => return Err(Canceled), + } + } + }; + + // If we've got a value, then skip the logic below as we're done. + if let Some(val) = inner.borrow_mut().value.take() { + return Ok(Async::Ready(val)) + } + + // If we can get mutable access, then the sender has gone away. We + // didn't see a value above, so we're canceled. Otherwise we park + // our task and wait for a value to come in. + if Rc::get_mut(inner).is_some() { + Err(Canceled) + } else { + inner.borrow_mut().rx_task = Some(task::current()); + Ok(Async::NotReady) + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + self.close(); + } +} + +/// Handle returned from the `spawn` function. +/// +/// This handle is a future representing the completion of a different future on +/// a separate executor. Created through the `oneshot::spawn` function this +/// handle will resolve when the future provided to `spawn` resolves on the +/// `Executor` instance provided to that function. +/// +/// If this handle is dropped then the future will automatically no longer be +/// polled and is scheduled to be dropped. This can be canceled with the +/// `forget` function, however. +pub struct SpawnHandle<T, E> { + rx: Receiver<Result<T, E>>, + keep_running: Rc<Cell<bool>>, +} + +/// Type of future which `Spawn` instances below must be able to spawn. +pub struct Execute<F: Future> { + future: F, + tx: Option<Sender<Result<F::Item, F::Error>>>, + keep_running: Rc<Cell<bool>>, +} + +/// Spawns a `future` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the completion of the future. +/// +/// The `SpawnHandle` returned is a future that is a proxy for `future` itself. +/// When `future` completes on `executor` then the `SpawnHandle` will itself be +/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is +/// thus not safe to send across threads. +/// +/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is +/// not desired then the `SpawnHandle::forget` function can be used to continue +/// running the future to completion. +/// +/// # Panics +/// +/// This function will panic if the instance of `Spawn` provided is unable to +/// spawn the `future` provided. +/// +/// If the provided instance of `Spawn` does not actually run `future` to +/// completion, then the returned handle may panic when polled. Typically this +/// is not a problem, though, as most instances of `Spawn` will run futures to +/// completion. +pub fn spawn<F, E>(future: F, executor: &E) -> SpawnHandle<F::Item, F::Error> + where F: Future, + E: Executor<Execute<F>>, +{ + let flag = Rc::new(Cell::new(false)); + let (tx, rx) = channel(); + executor.execute(Execute { + future: future, + tx: Some(tx), + keep_running: flag.clone(), + }).expect("failed to spawn future"); + SpawnHandle { + rx: rx, + keep_running: flag, + } +} + +/// Spawns a function `f` onto the `Spawn` instance provided `s`. +/// +/// For more information see the `spawn` function in this module. This function +/// is just a thin wrapper around `spawn` which will execute the closure on the +/// executor provided and then complete the future that the closure returns. +pub fn spawn_fn<F, R, E>(f: F, executor: &E) -> SpawnHandle<R::Item, R::Error> + where F: FnOnce() -> R, + R: IntoFuture, + E: Executor<Execute<Lazy<F, R>>>, +{ + spawn(lazy(f), executor) +} + +impl<T, E> SpawnHandle<T, E> { + /// Drop this future without canceling the underlying future. + /// + /// When `SpawnHandle` is dropped, the spawned future will be canceled as + /// well if the future hasn't already resolved. This function can be used + /// when to drop this future but keep executing the underlying future. + pub fn forget(self) { + self.keep_running.set(true); + } +} + +impl<T, E> Future for SpawnHandle<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + match self.rx.poll() { + Ok(Async::Ready(Ok(t))) => Ok(t.into()), + Ok(Async::Ready(Err(e))) => Err(e), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => panic!("future was canceled before completion"), + } + } +} + +impl<T: fmt::Debug, E: fmt::Debug> fmt::Debug for SpawnHandle<T, E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl<F: Future> Future for Execute<F> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + // If we're canceled then we may want to bail out early. + // + // If the `forget` function was called, though, then we keep going. + if self.tx.as_mut().unwrap().poll_cancel().unwrap().is_ready() { + if !self.keep_running.get() { + return Ok(().into()) + } + } + + let result = match self.future.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(t)) => Ok(t), + Err(e) => Err(e), + }; + drop(self.tx.take().unwrap().send(result)); + Ok(().into()) + } +} + +impl<F: Future + fmt::Debug> fmt::Debug for Execute<F> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .field("future", &self.future) + .finish() + } +} |