diff options
Diffstat (limited to 'third_party/rust/futures-util/src/stream/stream/buffered.rs')
-rw-r--r-- | third_party/rust/futures-util/src/stream/stream/buffered.rs | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/stream/stream/buffered.rs b/third_party/rust/futures-util/src/stream/stream/buffered.rs new file mode 100644 index 0000000000..5854eb7ea5 --- /dev/null +++ b/third_party/rust/futures-util/src/stream/stream/buffered.rs @@ -0,0 +1,118 @@ +use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt}; +use core::fmt; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`buffered`](super::StreamExt::buffered) method. + #[must_use = "streams do nothing unless polled"] + pub struct Buffered<St> + where + St: Stream, + St::Item: Future, + { + #[pin] + stream: Fuse<St>, + in_progress_queue: FuturesOrdered<St::Item>, + max: usize, + } +} + +impl<St> fmt::Debug for Buffered<St> +where + St: Stream + fmt::Debug, + St::Item: Future, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Buffered") + .field("stream", &self.stream) + .field("in_progress_queue", &self.in_progress_queue) + .field("max", &self.max) + .finish() + } +} + +impl<St> Buffered<St> +where + St: Stream, + St::Item: Future, +{ + pub(super) fn new(stream: St, n: usize) -> Self { + Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n } + } + + delegate_access_inner!(stream, St, (.)); +} + +impl<St> Stream for Buffered<St> +where + St: Stream, + St::Item: Future, +{ + type Item = <St::Item as Future>::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + + // First up, try to spawn off as many futures as possible by filling up + // our queue of futures. + while this.in_progress_queue.len() < *this.max { + match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), + Poll::Ready(None) | Poll::Pending => break, + } + } + + // Attempt to pull the next value from the in_progress_queue + let res = this.in_progress_queue.poll_next_unpin(cx); + if let Some(val) = ready!(res) { + return Poll::Ready(Some(val)); + } + + // If more values are still coming from the stream, we're not done yet + if this.stream.is_done() { + Poll::Ready(None) + } else { + Poll::Pending + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let queue_len = self.in_progress_queue.len(); + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(queue_len); + let upper = match upper { + Some(x) => x.checked_add(queue_len), + None => None, + }; + (lower, upper) + } +} + +impl<St> FusedStream for Buffered<St> +where + St: Stream, + St::Item: Future, +{ + fn is_terminated(&self) -> bool { + self.stream.is_done() && self.in_progress_queue.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Item> Sink<Item> for Buffered<S> +where + S: Stream + Sink<Item>, + S::Item: Future, +{ + type Error = S::Error; + + delegate_sink!(stream, Item); +} |