diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/unsync/mpsc.rs | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/unsync/mpsc.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/unsync/mpsc.rs | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs new file mode 100644 index 0000000000..ba0d52dc98 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs @@ -0,0 +1,474 @@ +//! A multi-producer, single-consumer, futures-aware, FIFO queue with back +//! pressure, for use communicating between tasks on the same thread. +//! +//! These queues are the same as those in `futures::sync`, except they're not +//! intended to be sent across threads. + +use std::any::Any; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::error::Error; +use std::fmt; +use std::mem; +use std::rc::{Rc, Weak}; + +use task::{self, Task}; +use future::Executor; +use sink::SendAll; +use resultstream::{self, Results}; +use unsync::oneshot; +use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream}; + +/// Creates a bounded in-memory channel with buffered storage. +/// +/// This method creates concrete implementations of the `Stream` and `Sink` +/// traits which can be used to communicate a stream of values between tasks +/// with backpressure. The channel capacity is exactly `buffer`. On average, +/// sending a message through this channel performs no dynamic allocation. +pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { + channel_(Some(buffer)) +} + +fn channel_<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) { + let shared = Rc::new(RefCell::new(Shared { + buffer: VecDeque::new(), + capacity: buffer, + blocked_senders: VecDeque::new(), + blocked_recv: None, + })); + let sender = Sender { shared: Rc::downgrade(&shared) }; + let receiver = Receiver { state: State::Open(shared) }; + (sender, receiver) +} + +#[derive(Debug)] +struct Shared<T> { + buffer: VecDeque<T>, + capacity: Option<usize>, + blocked_senders: VecDeque<Task>, + blocked_recv: Option<Task>, +} + +/// The transmission end of a channel. +/// +/// This is created by the `channel` function. +#[derive(Debug)] +pub struct Sender<T> { + shared: Weak<RefCell<Shared<T>>>, +} + +impl<T> Sender<T> { + fn do_send(&self, msg: T) -> StartSend<T, SendError<T>> { + let shared = match self.shared.upgrade() { + Some(shared) => shared, + None => return Err(SendError(msg)), // receiver was dropped + }; + let mut shared = shared.borrow_mut(); + + match shared.capacity { + Some(capacity) if shared.buffer.len() == capacity => { + shared.blocked_senders.push_back(task::current()); + Ok(AsyncSink::NotReady(msg)) + } + _ => { + shared.buffer.push_back(msg); + if let Some(task) = shared.blocked_recv.take() { + task.notify(); + } + Ok(AsyncSink::Ready) + } + } + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Sender { shared: self.shared.clone() } + } +} + +impl<T> Sink for Sender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.do_send(msg) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + let shared = match self.shared.upgrade() { + Some(shared) => shared, + None => return, + }; + // The number of existing `Weak` indicates if we are possibly the last + // `Sender`. If we are the last, we possibly must notify a blocked + // `Receiver`. `self.shared` is always one of the `Weak` to this shared + // data. Therefore the smallest possible Rc::weak_count(&shared) is 1. + if Rc::weak_count(&shared) == 1 { + if let Some(task) = shared.borrow_mut().blocked_recv.take() { + // Wake up receiver as its stream has ended + task.notify(); + } + } + } +} + +/// The receiving end of a channel which implements the `Stream` trait. +/// +/// This is created by the `channel` function. +#[derive(Debug)] +pub struct Receiver<T> { + state: State<T>, +} + +/// Possible states of a receiver. We're either Open (can receive more messages) +/// or we're closed with a list of messages we have left to receive. +#[derive(Debug)] +enum State<T> { + Open(Rc<RefCell<Shared<T>>>), + Closed(VecDeque<T>), +} + +impl<T> Receiver<T> { + /// Closes the receiving half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + let (blockers, items) = match self.state { + State::Open(ref state) => { + let mut state = state.borrow_mut(); + let items = mem::replace(&mut state.buffer, VecDeque::new()); + let blockers = mem::replace(&mut state.blocked_senders, VecDeque::new()); + (blockers, items) + } + State::Closed(_) => return, + }; + self.state = State::Closed(items); + for task in blockers { + task.notify(); + } + } +} + +impl<T> Stream for Receiver<T> { + type Item = T; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let me = match self.state { + State::Open(ref mut me) => me, + State::Closed(ref mut items) => { + return Ok(Async::Ready(items.pop_front())) + } + }; + + if let Some(shared) = Rc::get_mut(me) { + // All senders have been dropped, so drain the buffer and end the + // stream. + return Ok(Async::Ready(shared.borrow_mut().buffer.pop_front())); + } + + let mut shared = me.borrow_mut(); + if let Some(msg) = shared.buffer.pop_front() { + if let Some(task) = shared.blocked_senders.pop_front() { + drop(shared); + task.notify(); + } + Ok(Async::Ready(Some(msg))) + } else { + shared.blocked_recv = Some(task::current()); + Ok(Async::NotReady) + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + self.close(); + } +} + +/// The transmission end of an unbounded channel. +/// +/// This is created by the `unbounded` function. +#[derive(Debug)] +pub struct UnboundedSender<T>(Sender<T>); + +impl<T> Clone for UnboundedSender<T> { + fn clone(&self) -> Self { + UnboundedSender(self.0.clone()) + } +} + +impl<T> Sink for UnboundedSender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.0.start_send(msg) + } + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<'a, T> Sink for &'a UnboundedSender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.0.do_send(msg) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<T> UnboundedSender<T> { + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + #[deprecated(note = "renamed to `unbounded_send`")] + #[doc(hidden)] + pub fn send(&self, msg: T) -> Result<(), SendError<T>> { + self.unbounded_send(msg) + } + + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> { + let shared = match self.0.shared.upgrade() { + Some(shared) => shared, + None => return Err(SendError(msg)), + }; + let mut shared = shared.borrow_mut(); + shared.buffer.push_back(msg); + if let Some(task) = shared.blocked_recv.take() { + drop(shared); + task.notify(); + } + Ok(()) + } +} + +/// The receiving end of an unbounded channel. +/// +/// This is created by the `unbounded` function. +#[derive(Debug)] +pub struct UnboundedReceiver<T>(Receiver<T>); + +impl<T> UnboundedReceiver<T> { + /// Closes the receiving half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.0.close(); + } +} + +impl<T> Stream for UnboundedReceiver<T> { + type Item = T; + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.0.poll() + } +} + +/// Creates an unbounded in-memory channel with buffered storage. +/// +/// Identical semantics to `channel`, except with no limit to buffer size. +pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + let (send, recv) = channel_(None); + (UnboundedSender(send), UnboundedReceiver(recv)) +} + +/// Error type for sending, used when the receiving end of a channel is +/// dropped +pub struct SendError<T>(T); + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("SendError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "send failed because receiver is gone") + } +} + +impl<T: Any> Error for SendError<T> { + fn description(&self) -> &str { + "send failed because receiver is gone" + } +} + +impl<T> SendError<T> { + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.0 + } +} + +/// Handle returned from the `spawn` function. +/// +/// This handle is a stream that proxies a stream on a separate `Executor`. +/// Created through the `mpsc::spawn` function, this handle will produce +/// the same values as the proxied stream, as they are produced in the executor, +/// and uses a limited buffer to exert back-pressure on the remote stream. +/// +/// If this handle is dropped, then the stream will no longer be polled and is +/// scheduled to be dropped. +pub struct SpawnHandle<Item, Error> { + inner: Receiver<Result<Item, Error>>, + _cancel_tx: oneshot::Sender<()>, +} + +/// Type of future which `Executor` instances must be able to execute for `spawn`. +pub struct Execute<S: Stream> { + inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>, + cancel_rx: oneshot::Receiver<()>, +} + +/// Spawns a `stream` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the remote stream. +/// +/// The `stream` will be canceled if the `SpawnHandle` is dropped. +/// +/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. +/// When `stream` has additional items available, then the `SpawnHandle` +/// will have those same items available. +/// +/// At most `buffer + 1` elements will be buffered at a time. If the buffer +/// is full, then `stream` will stop progressing until more space is available. +/// This allows the `SpawnHandle` to exert backpressure on the `stream`. +/// +/// # Panics +/// +/// This function will panic if `executor` is unable spawn a `Future` containing +/// the entirety of the `stream`. +pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error> + where S: Stream, + E: Executor<Execute<S>> +{ + let (cancel_tx, cancel_rx) = oneshot::channel(); + let (tx, rx) = channel(buffer); + executor.execute(Execute { + inner: tx.send_all(resultstream::new(stream)), + cancel_rx: cancel_rx, + }).expect("failed to spawn stream"); + SpawnHandle { + inner: rx, + _cancel_tx: cancel_tx, + } +} + +/// Spawns a `stream` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the remote stream, with unbounded buffering. +/// +/// The `stream` will be canceled if the `SpawnHandle` is dropped. +/// +/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. +/// When `stream` has additional items available, then the `SpawnHandle` +/// will have those same items available. +/// +/// An unbounded buffer is used, which means that values will be buffered as +/// fast as `stream` can produce them, without any backpressure. Therefore, if +/// `stream` is an infinite stream, it can use an unbounded amount of memory, and +/// potentially hog CPU resources. In particular, if `stream` is infinite +/// and doesn't ever yield (by returning `Async::NotReady` from `poll`), it +/// will result in an infinite loop. +/// +/// # Panics +/// +/// This function will panic if `executor` is unable spawn a `Future` containing +/// the entirety of the `stream`. +pub fn spawn_unbounded<S,E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error> + where S: Stream, + E: Executor<Execute<S>> +{ + let (cancel_tx, cancel_rx) = oneshot::channel(); + let (tx, rx) = channel_(None); + executor.execute(Execute { + inner: tx.send_all(resultstream::new(stream)), + cancel_rx: cancel_rx, + }).expect("failed to spawn stream"); + SpawnHandle { + inner: rx, + _cancel_tx: cancel_tx, + } +} + +impl<I, E> Stream for SpawnHandle<I, E> { + type Item = I; + type Error = E; + + fn poll(&mut self) -> Poll<Option<I>, E> { + match self.inner.poll() { + Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))), + Ok(Async::Ready(Some(Err(e)))) => Err(e), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => unreachable!("mpsc::Receiver should never return Err"), + } + } +} + +impl<I, E> fmt::Debug for SpawnHandle<I, E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl<S: Stream> Future for Execute<S> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + match self.cancel_rx.poll() { + Ok(Async::NotReady) => (), + _ => return Ok(Async::Ready(())), + } + match self.inner.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + _ => Ok(Async::Ready(())) + } + } +} + +impl<S: Stream> fmt::Debug for Execute<S> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .finish() + } +} |