//! A multi-producer, single-consumer, futures-aware, FIFO queue with back //! pressure, for use communicating between tasks on the same thread. //! //! These queues are the same as those in `futures::sync`, except they're not //! intended to be sent across threads. use std::any::Any; use std::cell::RefCell; use std::collections::VecDeque; use std::error::Error; use std::fmt; use std::mem; use std::rc::{Rc, Weak}; use task::{self, Task}; use future::Executor; use sink::SendAll; use resultstream::{self, Results}; use unsync::oneshot; use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream}; /// Creates a bounded in-memory channel with buffered storage. /// /// This method creates concrete implementations of the `Stream` and `Sink` /// traits which can be used to communicate a stream of values between tasks /// with backpressure. The channel capacity is exactly `buffer`. On average, /// sending a message through this channel performs no dynamic allocation. pub fn channel(buffer: usize) -> (Sender, Receiver) { channel_(Some(buffer)) } fn channel_(buffer: Option) -> (Sender, Receiver) { let shared = Rc::new(RefCell::new(Shared { buffer: VecDeque::new(), capacity: buffer, blocked_senders: VecDeque::new(), blocked_recv: None, })); let sender = Sender { shared: Rc::downgrade(&shared) }; let receiver = Receiver { state: State::Open(shared) }; (sender, receiver) } #[derive(Debug)] struct Shared { buffer: VecDeque, capacity: Option, blocked_senders: VecDeque, blocked_recv: Option, } /// The transmission end of a channel. /// /// This is created by the `channel` function. #[derive(Debug)] pub struct Sender { shared: Weak>>, } impl Sender { fn do_send(&self, msg: T) -> StartSend> { let shared = match self.shared.upgrade() { Some(shared) => shared, None => return Err(SendError(msg)), // receiver was dropped }; let mut shared = shared.borrow_mut(); match shared.capacity { Some(capacity) if shared.buffer.len() == capacity => { shared.blocked_senders.push_back(task::current()); Ok(AsyncSink::NotReady(msg)) } _ => { shared.buffer.push_back(msg); if let Some(task) = shared.blocked_recv.take() { task.notify(); } Ok(AsyncSink::Ready) } } } } impl Clone for Sender { fn clone(&self) -> Self { Sender { shared: self.shared.clone() } } } impl Sink for Sender { type SinkItem = T; type SinkError = SendError; fn start_send(&mut self, msg: T) -> StartSend> { self.do_send(msg) } fn poll_complete(&mut self) -> Poll<(), SendError> { Ok(Async::Ready(())) } fn close(&mut self) -> Poll<(), SendError> { Ok(Async::Ready(())) } } impl Drop for Sender { fn drop(&mut self) { let shared = match self.shared.upgrade() { Some(shared) => shared, None => return, }; // The number of existing `Weak` indicates if we are possibly the last // `Sender`. If we are the last, we possibly must notify a blocked // `Receiver`. `self.shared` is always one of the `Weak` to this shared // data. Therefore the smallest possible Rc::weak_count(&shared) is 1. if Rc::weak_count(&shared) == 1 { if let Some(task) = shared.borrow_mut().blocked_recv.take() { // Wake up receiver as its stream has ended task.notify(); } } } } /// The receiving end of a channel which implements the `Stream` trait. /// /// This is created by the `channel` function. #[derive(Debug)] pub struct Receiver { state: State, } /// Possible states of a receiver. We're either Open (can receive more messages) /// or we're closed with a list of messages we have left to receive. #[derive(Debug)] enum State { Open(Rc>>), Closed(VecDeque), } impl Receiver { /// Closes the receiving half /// /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. pub fn close(&mut self) { let (blockers, items) = match self.state { State::Open(ref state) => { let mut state = state.borrow_mut(); let items = mem::replace(&mut state.buffer, VecDeque::new()); let blockers = mem::replace(&mut state.blocked_senders, VecDeque::new()); (blockers, items) } State::Closed(_) => return, }; self.state = State::Closed(items); for task in blockers { task.notify(); } } } impl Stream for Receiver { type Item = T; type Error = (); fn poll(&mut self) -> Poll, Self::Error> { let me = match self.state { State::Open(ref mut me) => me, State::Closed(ref mut items) => { return Ok(Async::Ready(items.pop_front())) } }; if let Some(shared) = Rc::get_mut(me) { // All senders have been dropped, so drain the buffer and end the // stream. return Ok(Async::Ready(shared.borrow_mut().buffer.pop_front())); } let mut shared = me.borrow_mut(); if let Some(msg) = shared.buffer.pop_front() { if let Some(task) = shared.blocked_senders.pop_front() { drop(shared); task.notify(); } Ok(Async::Ready(Some(msg))) } else { shared.blocked_recv = Some(task::current()); Ok(Async::NotReady) } } } impl Drop for Receiver { fn drop(&mut self) { self.close(); } } /// The transmission end of an unbounded channel. /// /// This is created by the `unbounded` function. #[derive(Debug)] pub struct UnboundedSender(Sender); impl Clone for UnboundedSender { fn clone(&self) -> Self { UnboundedSender(self.0.clone()) } } impl Sink for UnboundedSender { type SinkItem = T; type SinkError = SendError; fn start_send(&mut self, msg: T) -> StartSend> { self.0.start_send(msg) } fn poll_complete(&mut self) -> Poll<(), SendError> { Ok(Async::Ready(())) } fn close(&mut self) -> Poll<(), SendError> { Ok(Async::Ready(())) } } impl<'a, T> Sink for &'a UnboundedSender { type SinkItem = T; type SinkError = SendError; fn start_send(&mut self, msg: T) -> StartSend> { self.0.do_send(msg) } fn poll_complete(&mut self) -> Poll<(), SendError> { Ok(Async::Ready(())) } fn close(&mut self) -> Poll<(), SendError> { Ok(Async::Ready(())) } } impl UnboundedSender { /// Sends the provided message along this channel. /// /// This is an unbounded sender, so this function differs from `Sink::send` /// by ensuring the return type reflects that the channel is always ready to /// receive messages. #[deprecated(note = "renamed to `unbounded_send`")] #[doc(hidden)] pub fn send(&self, msg: T) -> Result<(), SendError> { self.unbounded_send(msg) } /// Sends the provided message along this channel. /// /// This is an unbounded sender, so this function differs from `Sink::send` /// by ensuring the return type reflects that the channel is always ready to /// receive messages. pub fn unbounded_send(&self, msg: T) -> Result<(), SendError> { let shared = match self.0.shared.upgrade() { Some(shared) => shared, None => return Err(SendError(msg)), }; let mut shared = shared.borrow_mut(); shared.buffer.push_back(msg); if let Some(task) = shared.blocked_recv.take() { drop(shared); task.notify(); } Ok(()) } } /// The receiving end of an unbounded channel. /// /// This is created by the `unbounded` function. #[derive(Debug)] pub struct UnboundedReceiver(Receiver); impl UnboundedReceiver { /// Closes the receiving half /// /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. pub fn close(&mut self) { self.0.close(); } } impl Stream for UnboundedReceiver { type Item = T; type Error = (); fn poll(&mut self) -> Poll, Self::Error> { self.0.poll() } } /// Creates an unbounded in-memory channel with buffered storage. /// /// Identical semantics to `channel`, except with no limit to buffer size. pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { let (send, recv) = channel_(None); (UnboundedSender(send), UnboundedReceiver(recv)) } /// Error type for sending, used when the receiving end of a channel is /// dropped pub struct SendError(T); impl fmt::Debug for SendError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_tuple("SendError") .field(&"...") .finish() } } impl fmt::Display for SendError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "send failed because receiver is gone") } } impl Error for SendError { fn description(&self) -> &str { "send failed because receiver is gone" } } impl SendError { /// Returns the message that was attempted to be sent but failed. pub fn into_inner(self) -> T { self.0 } } /// Handle returned from the `spawn` function. /// /// This handle is a stream that proxies a stream on a separate `Executor`. /// Created through the `mpsc::spawn` function, this handle will produce /// the same values as the proxied stream, as they are produced in the executor, /// and uses a limited buffer to exert back-pressure on the remote stream. /// /// If this handle is dropped, then the stream will no longer be polled and is /// scheduled to be dropped. pub struct SpawnHandle { inner: Receiver>, _cancel_tx: oneshot::Sender<()>, } /// Type of future which `Executor` instances must be able to execute for `spawn`. pub struct Execute { inner: SendAll>, Results>>>, cancel_rx: oneshot::Receiver<()>, } /// Spawns a `stream` onto the instance of `Executor` provided, `executor`, /// returning a handle representing the remote stream. /// /// The `stream` will be canceled if the `SpawnHandle` is dropped. /// /// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. /// When `stream` has additional items available, then the `SpawnHandle` /// will have those same items available. /// /// At most `buffer + 1` elements will be buffered at a time. If the buffer /// is full, then `stream` will stop progressing until more space is available. /// This allows the `SpawnHandle` to exert backpressure on the `stream`. /// /// # Panics /// /// This function will panic if `executor` is unable spawn a `Future` containing /// the entirety of the `stream`. pub fn spawn(stream: S, executor: &E, buffer: usize) -> SpawnHandle where S: Stream, E: Executor> { let (cancel_tx, cancel_rx) = oneshot::channel(); let (tx, rx) = channel(buffer); executor.execute(Execute { inner: tx.send_all(resultstream::new(stream)), cancel_rx: cancel_rx, }).expect("failed to spawn stream"); SpawnHandle { inner: rx, _cancel_tx: cancel_tx, } } /// Spawns a `stream` onto the instance of `Executor` provided, `executor`, /// returning a handle representing the remote stream, with unbounded buffering. /// /// The `stream` will be canceled if the `SpawnHandle` is dropped. /// /// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. /// When `stream` has additional items available, then the `SpawnHandle` /// will have those same items available. /// /// An unbounded buffer is used, which means that values will be buffered as /// fast as `stream` can produce them, without any backpressure. Therefore, if /// `stream` is an infinite stream, it can use an unbounded amount of memory, and /// potentially hog CPU resources. In particular, if `stream` is infinite /// and doesn't ever yield (by returning `Async::NotReady` from `poll`), it /// will result in an infinite loop. /// /// # Panics /// /// This function will panic if `executor` is unable spawn a `Future` containing /// the entirety of the `stream`. pub fn spawn_unbounded(stream: S, executor: &E) -> SpawnHandle where S: Stream, E: Executor> { let (cancel_tx, cancel_rx) = oneshot::channel(); let (tx, rx) = channel_(None); executor.execute(Execute { inner: tx.send_all(resultstream::new(stream)), cancel_rx: cancel_rx, }).expect("failed to spawn stream"); SpawnHandle { inner: rx, _cancel_tx: cancel_tx, } } impl Stream for SpawnHandle { type Item = I; type Error = E; fn poll(&mut self) -> Poll, E> { match self.inner.poll() { Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))), Ok(Async::Ready(Some(Err(e)))) => Err(e), Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => unreachable!("mpsc::Receiver should never return Err"), } } } impl fmt::Debug for SpawnHandle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("SpawnHandle") .finish() } } impl Future for Execute { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { match self.cancel_rx.poll() { Ok(Async::NotReady) => (), _ => return Ok(Async::Ready(())), } match self.inner.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), _ => Ok(Async::Ready(())) } } } impl fmt::Debug for Execute { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Execute") .finish() } }