summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/unsync/mpsc.rs
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/unsync/mpsc.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/unsync/mpsc.rs474
1 files changed, 474 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs
new file mode 100644
index 0000000000..ba0d52dc98
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/unsync/mpsc.rs
@@ -0,0 +1,474 @@
+//! A multi-producer, single-consumer, futures-aware, FIFO queue with back
+//! pressure, for use communicating between tasks on the same thread.
+//!
+//! These queues are the same as those in `futures::sync`, except they're not
+//! intended to be sent across threads.
+
+use std::any::Any;
+use std::cell::RefCell;
+use std::collections::VecDeque;
+use std::error::Error;
+use std::fmt;
+use std::mem;
+use std::rc::{Rc, Weak};
+
+use task::{self, Task};
+use future::Executor;
+use sink::SendAll;
+use resultstream::{self, Results};
+use unsync::oneshot;
+use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream};
+
+/// Creates a bounded in-memory channel with buffered storage.
+///
+/// This method creates concrete implementations of the `Stream` and `Sink`
+/// traits which can be used to communicate a stream of values between tasks
+/// with backpressure. The channel capacity is exactly `buffer`. On average,
+/// sending a message through this channel performs no dynamic allocation.
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+ channel_(Some(buffer))
+}
+
+fn channel_<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
+ let shared = Rc::new(RefCell::new(Shared {
+ buffer: VecDeque::new(),
+ capacity: buffer,
+ blocked_senders: VecDeque::new(),
+ blocked_recv: None,
+ }));
+ let sender = Sender { shared: Rc::downgrade(&shared) };
+ let receiver = Receiver { state: State::Open(shared) };
+ (sender, receiver)
+}
+
+#[derive(Debug)]
+struct Shared<T> {
+ buffer: VecDeque<T>,
+ capacity: Option<usize>,
+ blocked_senders: VecDeque<Task>,
+ blocked_recv: Option<Task>,
+}
+
+/// The transmission end of a channel.
+///
+/// This is created by the `channel` function.
+#[derive(Debug)]
+pub struct Sender<T> {
+ shared: Weak<RefCell<Shared<T>>>,
+}
+
+impl<T> Sender<T> {
+ fn do_send(&self, msg: T) -> StartSend<T, SendError<T>> {
+ let shared = match self.shared.upgrade() {
+ Some(shared) => shared,
+ None => return Err(SendError(msg)), // receiver was dropped
+ };
+ let mut shared = shared.borrow_mut();
+
+ match shared.capacity {
+ Some(capacity) if shared.buffer.len() == capacity => {
+ shared.blocked_senders.push_back(task::current());
+ Ok(AsyncSink::NotReady(msg))
+ }
+ _ => {
+ shared.buffer.push_back(msg);
+ if let Some(task) = shared.blocked_recv.take() {
+ task.notify();
+ }
+ Ok(AsyncSink::Ready)
+ }
+ }
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ Sender { shared: self.shared.clone() }
+ }
+}
+
+impl<T> Sink for Sender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.do_send(msg)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ let shared = match self.shared.upgrade() {
+ Some(shared) => shared,
+ None => return,
+ };
+ // The number of existing `Weak` indicates if we are possibly the last
+ // `Sender`. If we are the last, we possibly must notify a blocked
+ // `Receiver`. `self.shared` is always one of the `Weak` to this shared
+ // data. Therefore the smallest possible Rc::weak_count(&shared) is 1.
+ if Rc::weak_count(&shared) == 1 {
+ if let Some(task) = shared.borrow_mut().blocked_recv.take() {
+ // Wake up receiver as its stream has ended
+ task.notify();
+ }
+ }
+ }
+}
+
+/// The receiving end of a channel which implements the `Stream` trait.
+///
+/// This is created by the `channel` function.
+#[derive(Debug)]
+pub struct Receiver<T> {
+ state: State<T>,
+}
+
+/// Possible states of a receiver. We're either Open (can receive more messages)
+/// or we're closed with a list of messages we have left to receive.
+#[derive(Debug)]
+enum State<T> {
+ Open(Rc<RefCell<Shared<T>>>),
+ Closed(VecDeque<T>),
+}
+
+impl<T> Receiver<T> {
+ /// Closes the receiving half
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ let (blockers, items) = match self.state {
+ State::Open(ref state) => {
+ let mut state = state.borrow_mut();
+ let items = mem::replace(&mut state.buffer, VecDeque::new());
+ let blockers = mem::replace(&mut state.blocked_senders, VecDeque::new());
+ (blockers, items)
+ }
+ State::Closed(_) => return,
+ };
+ self.state = State::Closed(items);
+ for task in blockers {
+ task.notify();
+ }
+ }
+}
+
+impl<T> Stream for Receiver<T> {
+ type Item = T;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ let me = match self.state {
+ State::Open(ref mut me) => me,
+ State::Closed(ref mut items) => {
+ return Ok(Async::Ready(items.pop_front()))
+ }
+ };
+
+ if let Some(shared) = Rc::get_mut(me) {
+ // All senders have been dropped, so drain the buffer and end the
+ // stream.
+ return Ok(Async::Ready(shared.borrow_mut().buffer.pop_front()));
+ }
+
+ let mut shared = me.borrow_mut();
+ if let Some(msg) = shared.buffer.pop_front() {
+ if let Some(task) = shared.blocked_senders.pop_front() {
+ drop(shared);
+ task.notify();
+ }
+ Ok(Async::Ready(Some(msg)))
+ } else {
+ shared.blocked_recv = Some(task::current());
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ self.close();
+ }
+}
+
+/// The transmission end of an unbounded channel.
+///
+/// This is created by the `unbounded` function.
+#[derive(Debug)]
+pub struct UnboundedSender<T>(Sender<T>);
+
+impl<T> Clone for UnboundedSender<T> {
+ fn clone(&self) -> Self {
+ UnboundedSender(self.0.clone())
+ }
+}
+
+impl<T> Sink for UnboundedSender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.0.start_send(msg)
+ }
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<'a, T> Sink for &'a UnboundedSender<T> {
+ type SinkItem = T;
+ type SinkError = SendError<T>;
+
+ fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
+ self.0.do_send(msg)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), SendError<T>> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> UnboundedSender<T> {
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ #[deprecated(note = "renamed to `unbounded_send`")]
+ #[doc(hidden)]
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ self.unbounded_send(msg)
+ }
+
+ /// Sends the provided message along this channel.
+ ///
+ /// This is an unbounded sender, so this function differs from `Sink::send`
+ /// by ensuring the return type reflects that the channel is always ready to
+ /// receive messages.
+ pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
+ let shared = match self.0.shared.upgrade() {
+ Some(shared) => shared,
+ None => return Err(SendError(msg)),
+ };
+ let mut shared = shared.borrow_mut();
+ shared.buffer.push_back(msg);
+ if let Some(task) = shared.blocked_recv.take() {
+ drop(shared);
+ task.notify();
+ }
+ Ok(())
+ }
+}
+
+/// The receiving end of an unbounded channel.
+///
+/// This is created by the `unbounded` function.
+#[derive(Debug)]
+pub struct UnboundedReceiver<T>(Receiver<T>);
+
+impl<T> UnboundedReceiver<T> {
+ /// Closes the receiving half
+ ///
+ /// This prevents any further messages from being sent on the channel while
+ /// still enabling the receiver to drain messages that are buffered.
+ pub fn close(&mut self) {
+ self.0.close();
+ }
+}
+
+impl<T> Stream for UnboundedReceiver<T> {
+ type Item = T;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.0.poll()
+ }
+}
+
+/// Creates an unbounded in-memory channel with buffered storage.
+///
+/// Identical semantics to `channel`, except with no limit to buffer size.
+pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+ let (send, recv) = channel_(None);
+ (UnboundedSender(send), UnboundedReceiver(recv))
+}
+
+/// Error type for sending, used when the receiving end of a channel is
+/// dropped
+pub struct SendError<T>(T);
+
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("SendError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "send failed because receiver is gone")
+ }
+}
+
+impl<T: Any> Error for SendError<T> {
+ fn description(&self) -> &str {
+ "send failed because receiver is gone"
+ }
+}
+
+impl<T> SendError<T> {
+ /// Returns the message that was attempted to be sent but failed.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+/// Handle returned from the `spawn` function.
+///
+/// This handle is a stream that proxies a stream on a separate `Executor`.
+/// Created through the `mpsc::spawn` function, this handle will produce
+/// the same values as the proxied stream, as they are produced in the executor,
+/// and uses a limited buffer to exert back-pressure on the remote stream.
+///
+/// If this handle is dropped, then the stream will no longer be polled and is
+/// scheduled to be dropped.
+pub struct SpawnHandle<Item, Error> {
+ inner: Receiver<Result<Item, Error>>,
+ _cancel_tx: oneshot::Sender<()>,
+}
+
+/// Type of future which `Executor` instances must be able to execute for `spawn`.
+pub struct Execute<S: Stream> {
+ inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>,
+ cancel_rx: oneshot::Receiver<()>,
+}
+
+/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the remote stream.
+///
+/// The `stream` will be canceled if the `SpawnHandle` is dropped.
+///
+/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
+/// When `stream` has additional items available, then the `SpawnHandle`
+/// will have those same items available.
+///
+/// At most `buffer + 1` elements will be buffered at a time. If the buffer
+/// is full, then `stream` will stop progressing until more space is available.
+/// This allows the `SpawnHandle` to exert backpressure on the `stream`.
+///
+/// # Panics
+///
+/// This function will panic if `executor` is unable spawn a `Future` containing
+/// the entirety of the `stream`.
+pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error>
+ where S: Stream,
+ E: Executor<Execute<S>>
+{
+ let (cancel_tx, cancel_rx) = oneshot::channel();
+ let (tx, rx) = channel(buffer);
+ executor.execute(Execute {
+ inner: tx.send_all(resultstream::new(stream)),
+ cancel_rx: cancel_rx,
+ }).expect("failed to spawn stream");
+ SpawnHandle {
+ inner: rx,
+ _cancel_tx: cancel_tx,
+ }
+}
+
+/// Spawns a `stream` onto the instance of `Executor` provided, `executor`,
+/// returning a handle representing the remote stream, with unbounded buffering.
+///
+/// The `stream` will be canceled if the `SpawnHandle` is dropped.
+///
+/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself.
+/// When `stream` has additional items available, then the `SpawnHandle`
+/// will have those same items available.
+///
+/// An unbounded buffer is used, which means that values will be buffered as
+/// fast as `stream` can produce them, without any backpressure. Therefore, if
+/// `stream` is an infinite stream, it can use an unbounded amount of memory, and
+/// potentially hog CPU resources. In particular, if `stream` is infinite
+/// and doesn't ever yield (by returning `Async::NotReady` from `poll`), it
+/// will result in an infinite loop.
+///
+/// # Panics
+///
+/// This function will panic if `executor` is unable spawn a `Future` containing
+/// the entirety of the `stream`.
+pub fn spawn_unbounded<S,E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error>
+ where S: Stream,
+ E: Executor<Execute<S>>
+{
+ let (cancel_tx, cancel_rx) = oneshot::channel();
+ let (tx, rx) = channel_(None);
+ executor.execute(Execute {
+ inner: tx.send_all(resultstream::new(stream)),
+ cancel_rx: cancel_rx,
+ }).expect("failed to spawn stream");
+ SpawnHandle {
+ inner: rx,
+ _cancel_tx: cancel_tx,
+ }
+}
+
+impl<I, E> Stream for SpawnHandle<I, E> {
+ type Item = I;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<I>, E> {
+ match self.inner.poll() {
+ Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))),
+ Ok(Async::Ready(Some(Err(e)))) => Err(e),
+ Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Err(_) => unreachable!("mpsc::Receiver should never return Err"),
+ }
+ }
+}
+
+impl<I, E> fmt::Debug for SpawnHandle<I, E> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("SpawnHandle")
+ .finish()
+ }
+}
+
+impl<S: Stream> Future for Execute<S> {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ match self.cancel_rx.poll() {
+ Ok(Async::NotReady) => (),
+ _ => return Ok(Async::Ready(())),
+ }
+ match self.inner.poll() {
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ _ => Ok(Async::Ready(()))
+ }
+ }
+}
+
+impl<S: Stream> fmt::Debug for Execute<S> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Execute")
+ .finish()
+ }
+}