diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/stream | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/stream')
23 files changed, 2708 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/stream/all.rs b/third_party/rust/tokio/src/stream/all.rs new file mode 100644 index 0000000000..615665d270 --- /dev/null +++ b/third_party/rust/tokio/src/stream/all.rs @@ -0,0 +1,45 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`all`](super::StreamExt::all) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct AllFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, +} + +impl<'a, St: ?Sized, F> AllFuture<'a, St, F> { + pub(super) fn new(stream: &'a mut St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St: ?Sized + Unpin, F> Unpin for AllFuture<'_, St, F> {} + +impl<St, F> Future for AllFuture<'_, St, F> +where + St: ?Sized + Stream + Unpin, + F: FnMut(St::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); + + match next { + Some(v) => { + if !(&mut self.f)(v) { + Poll::Ready(false) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + None => Poll::Ready(true), + } + } +} diff --git a/third_party/rust/tokio/src/stream/any.rs b/third_party/rust/tokio/src/stream/any.rs new file mode 100644 index 0000000000..f2ecad5edb --- /dev/null +++ b/third_party/rust/tokio/src/stream/any.rs @@ -0,0 +1,45 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`any`](super::StreamExt::any) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct AnyFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, +} + +impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> { + pub(super) fn new(stream: &'a mut St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St: ?Sized + Unpin, F> Unpin for AnyFuture<'_, St, F> {} + +impl<St, F> Future for AnyFuture<'_, St, F> +where + St: ?Sized + Stream + Unpin, + F: FnMut(St::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); + + match next { + Some(v) => { + if (&mut self.f)(v) { + Poll::Ready(true) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + None => Poll::Ready(false), + } + } +} diff --git a/third_party/rust/tokio/src/stream/chain.rs b/third_party/rust/tokio/src/stream/chain.rs new file mode 100644 index 0000000000..5f0324a4b5 --- /dev/null +++ b/third_party/rust/tokio/src/stream/chain.rs @@ -0,0 +1,57 @@ +use crate::stream::{Fuse, Stream}; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`chain`](super::StreamExt::chain) method. + pub struct Chain<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: U, + } +} + +impl<T, U> Chain<T, U> { + pub(super) fn new(a: T, b: U) -> Chain<T, U> + where + T: Stream, + U: Stream, + { + Chain { a: Fuse::new(a), b } + } +} + +impl<T, U> Stream for Chain<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + use Poll::Ready; + + let me = self.project(); + + if let Some(v) = ready!(me.a.poll_next(cx)) { + return Ready(Some(v)); + } + + me.b.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (a_lower, a_upper) = self.a.size_hint(); + let (b_lower, b_upper) = self.b.size_hint(); + + let upper = match (a_upper, b_upper) { + (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper), + _ => None, + }; + + (a_lower + b_lower, upper) + } +} diff --git a/third_party/rust/tokio/src/stream/collect.rs b/third_party/rust/tokio/src/stream/collect.rs new file mode 100644 index 0000000000..f44c72b7b3 --- /dev/null +++ b/third_party/rust/tokio/src/stream/collect.rs @@ -0,0 +1,246 @@ +use crate::stream::Stream; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use core::future::Future; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +// Do not export this struct until `FromStream` can be unsealed. +pin_project! { + /// Future returned by the [`collect`](super::StreamExt::collect) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct Collect<T, U> + where + T: Stream, + U: FromStream<T::Item>, + { + #[pin] + stream: T, + collection: U::Collection, + } +} + +/// Convert from a [`Stream`](crate::stream::Stream). +/// +/// This trait is not intended to be used directly. Instead, call +/// [`StreamExt::collect()`](super::StreamExt::collect). +/// +/// # Implementing +/// +/// Currently, this trait may not be implemented by third parties. The trait is +/// sealed in order to make changes in the future. Stabilization is pending +/// enhancements to the Rust langague. +pub trait FromStream<T>: sealed::FromStreamPriv<T> {} + +impl<T, U> Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + pub(super) fn new(stream: T) -> Collect<T, U> { + let (lower, upper) = stream.size_hint(); + let collection = U::initialize(lower, upper); + + Collect { stream, collection } + } +} + +impl<T, U> Future for Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + type Output = U; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> { + use Poll::Ready; + + loop { + let mut me = self.as_mut().project(); + + let item = match ready!(me.stream.poll_next(cx)) { + Some(item) => item, + None => { + return Ready(U::finalize(&mut me.collection)); + } + }; + + if !U::extend(&mut me.collection, item) { + return Ready(U::finalize(&mut me.collection)); + } + } + } +} + +// ===== FromStream implementations + +impl FromStream<()> for () {} + +impl sealed::FromStreamPriv<()> for () { + type Collection = (); + + fn initialize(_lower: usize, _upper: Option<usize>) {} + + fn extend(_collection: &mut (), _item: ()) -> bool { + true + } + + fn finalize(_collection: &mut ()) {} +} + +impl<T: AsRef<str>> FromStream<T> for String {} + +impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { + type Collection = String; + + fn initialize(_lower: usize, _upper: Option<usize>) -> String { + String::new() + } + + fn extend(collection: &mut String, item: T) -> bool { + collection.push_str(item.as_ref()); + true + } + + fn finalize(collection: &mut String) -> String { + mem::replace(collection, String::new()) + } +} + +impl<T> FromStream<T> for Vec<T> {} + +impl<T> sealed::FromStreamPriv<T> for Vec<T> { + type Collection = Vec<T>; + + fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> { + Vec::with_capacity(lower) + } + + fn extend(collection: &mut Vec<T>, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(collection: &mut Vec<T>) -> Vec<T> { + mem::replace(collection, vec![]) + } +} + +impl<T> FromStream<T> for Box<[T]> {} + +impl<T> sealed::FromStreamPriv<T> for Box<[T]> { + type Collection = Vec<T>; + + fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> { + <Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper) + } + + fn extend(collection: &mut Vec<T>, item: T) -> bool { + <Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item) + } + + fn finalize(collection: &mut Vec<T>) -> Box<[T]> { + <Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice() + } +} + +impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {} + +impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E> +where + U: FromStream<T>, +{ + type Collection = Result<U::Collection, E>; + + fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> { + Ok(U::initialize(lower, upper)) + } + + fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool { + assert!(collection.is_ok()); + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(collection, item) + } + Err(err) => { + *collection = Err(err); + false + } + } + } + + fn finalize(collection: &mut Self::Collection) -> Result<U, E> { + if let Ok(collection) = collection.as_mut() { + Ok(U::finalize(collection)) + } else { + let res = mem::replace(collection, Ok(U::initialize(0, Some(0)))); + + if let Err(err) = res { + Err(err) + } else { + unreachable!(); + } + } + } +} + +impl<T: Buf> FromStream<T> for Bytes {} + +impl<T: Buf> sealed::FromStreamPriv<T> for Bytes { + type Collection = BytesMut; + + fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut { + BytesMut::new() + } + + fn extend(collection: &mut BytesMut, item: T) -> bool { + collection.put(item); + true + } + + fn finalize(collection: &mut BytesMut) -> Bytes { + mem::replace(collection, BytesMut::new()).freeze() + } +} + +impl<T: Buf> FromStream<T> for BytesMut {} + +impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut { + type Collection = BytesMut; + + fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut { + BytesMut::new() + } + + fn extend(collection: &mut BytesMut, item: T) -> bool { + collection.put(item); + true + } + + fn finalize(collection: &mut BytesMut) -> BytesMut { + mem::replace(collection, BytesMut::new()) + } +} + +pub(crate) mod sealed { + #[doc(hidden)] + pub trait FromStreamPriv<T> { + /// Intermediate type used during collection process + type Collection; + + /// Initialize the collection + fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection; + + /// Extend the collection with the received item + /// + /// Return `true` to continue streaming, `false` complete collection. + fn extend(collection: &mut Self::Collection, item: T) -> bool; + + /// Finalize collection into target type. + fn finalize(collection: &mut Self::Collection) -> Self; + } +} diff --git a/third_party/rust/tokio/src/stream/empty.rs b/third_party/rust/tokio/src/stream/empty.rs new file mode 100644 index 0000000000..6118673e50 --- /dev/null +++ b/third_party/rust/tokio/src/stream/empty.rs @@ -0,0 +1,50 @@ +use crate::stream::Stream; + +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`empty`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Empty<T>(PhantomData<T>); + +impl<T> Unpin for Empty<T> {} +unsafe impl<T> Send for Empty<T> {} +unsafe impl<T> Sync for Empty<T> {} + +/// Creates a stream that yields nothing. +/// +/// The returned stream is immediately ready and returns `None`. Use +/// [`stream::pending()`](super::pending()) to obtain a stream that is never +/// ready. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// use tokio::stream::{self, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut none = stream::empty::<i32>(); +/// +/// assert_eq!(None, none.next().await); +/// } +/// ``` +pub const fn empty<T>() -> Empty<T> { + Empty(PhantomData) +} + +impl<T> Stream for Empty<T> { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { + Poll::Ready(None) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, Some(0)) + } +} diff --git a/third_party/rust/tokio/src/stream/filter.rs b/third_party/rust/tokio/src/stream/filter.rs new file mode 100644 index 0000000000..799630b234 --- /dev/null +++ b/third_party/rust/tokio/src/stream/filter.rs @@ -0,0 +1,58 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`filter`](super::StreamExt::filter) method. + #[must_use = "streams do nothing unless polled"] + pub struct Filter<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Filter<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Filter") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> Filter<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St, F> Stream for Filter<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if (self.as_mut().project().f)(&e) { + return Poll::Ready(Some(e)); + } + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate + } +} diff --git a/third_party/rust/tokio/src/stream/filter_map.rs b/third_party/rust/tokio/src/stream/filter_map.rs new file mode 100644 index 0000000000..8dc05a5460 --- /dev/null +++ b/third_party/rust/tokio/src/stream/filter_map.rs @@ -0,0 +1,58 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`filter_map`](super::StreamExt::filter_map) method. + #[must_use = "streams do nothing unless polled"] + pub struct FilterMap<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for FilterMap<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FilterMap") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> FilterMap<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St, F, T> Stream for FilterMap<St, F> +where + St: Stream, + F: FnMut(St::Item) -> Option<T>, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if let Some(e) = (self.as_mut().project().f)(e) { + return Poll::Ready(Some(e)); + } + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate + } +} diff --git a/third_party/rust/tokio/src/stream/fold.rs b/third_party/rust/tokio/src/stream/fold.rs new file mode 100644 index 0000000000..7b9fead3db --- /dev/null +++ b/third_party/rust/tokio/src/stream/fold.rs @@ -0,0 +1,51 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future returned by the [`fold`](super::StreamExt::fold) method. + #[derive(Debug)] + pub struct FoldFuture<St, B, F> { + #[pin] + stream: St, + acc: Option<B>, + f: F, + } +} + +impl<St, B, F> FoldFuture<St, B, F> { + pub(super) fn new(stream: St, init: B, f: F) -> Self { + Self { + stream, + acc: Some(init), + f, + } + } +} + +impl<St, B, F> Future for FoldFuture<St, B, F> +where + St: Stream, + F: FnMut(B, St::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + let next = ready!(me.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let old = me.acc.take().unwrap(); + let new = (me.f)(old, v); + *me.acc = Some(new); + } + None => return Poll::Ready(me.acc.take().unwrap()), + } + } + } +} diff --git a/third_party/rust/tokio/src/stream/fuse.rs b/third_party/rust/tokio/src/stream/fuse.rs new file mode 100644 index 0000000000..6c9e02d664 --- /dev/null +++ b/third_party/rust/tokio/src/stream/fuse.rs @@ -0,0 +1,53 @@ +use crate::stream::Stream; + +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Stream returned by [`fuse()`][super::StreamExt::fuse]. + #[derive(Debug)] + pub struct Fuse<T> { + #[pin] + stream: Option<T>, + } +} + +impl<T> Fuse<T> +where + T: Stream, +{ + pub(crate) fn new(stream: T) -> Fuse<T> { + Fuse { + stream: Some(stream), + } + } +} + +impl<T> Stream for Fuse<T> +where + T: Stream, +{ + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let res = match Option::as_pin_mut(self.as_mut().project().stream) { + Some(stream) => ready!(stream.poll_next(cx)), + None => return Poll::Ready(None), + }; + + if res.is_none() { + // Do not poll the stream anymore + self.as_mut().project().stream.set(None); + } + + Poll::Ready(res) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self.stream { + Some(ref stream) => stream.size_hint(), + None => (0, Some(0)), + } + } +} diff --git a/third_party/rust/tokio/src/stream/iter.rs b/third_party/rust/tokio/src/stream/iter.rs new file mode 100644 index 0000000000..36eeb5612f --- /dev/null +++ b/third_party/rust/tokio/src/stream/iter.rs @@ -0,0 +1,55 @@ +use crate::stream::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`iter`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Iter<I> { + iter: I, +} + +impl<I> Unpin for Iter<I> {} + +/// Converts an `Iterator` into a `Stream` which is always ready +/// to yield the next value. +/// +/// Iterators in Rust don't express the ability to block, so this adapter +/// simply always calls `iter.next()` and returns that. +/// +/// ``` +/// # async fn dox() { +/// use tokio::stream::{self, StreamExt}; +/// +/// let mut stream = stream::iter(vec![17, 19]); +/// +/// assert_eq!(stream.next().await, Some(17)); +/// assert_eq!(stream.next().await, Some(19)); +/// assert_eq!(stream.next().await, None); +/// # } +/// ``` +pub fn iter<I>(i: I) -> Iter<I::IntoIter> +where + I: IntoIterator, +{ + Iter { + iter: i.into_iter(), + } +} + +impl<I> Stream for Iter<I> +where + I: Iterator, +{ + type Item = I::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> { + ready!(crate::coop::poll_proceed(cx)); + Poll::Ready(self.iter.next()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} diff --git a/third_party/rust/tokio/src/stream/map.rs b/third_party/rust/tokio/src/stream/map.rs new file mode 100644 index 0000000000..dfac5a2c94 --- /dev/null +++ b/third_party/rust/tokio/src/stream/map.rs @@ -0,0 +1,51 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Map<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Map").field("stream", &self.stream).finish() + } +} + +impl<St, F> Map<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Map { stream, f } + } +} + +impl<St, F, T> Stream for Map<St, F> +where + St: Stream, + F: FnMut(St::Item) -> T, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.as_mut() + .project() + .stream + .poll_next(cx) + .map(|opt| opt.map(|x| (self.as_mut().project().f)(x))) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} diff --git a/third_party/rust/tokio/src/stream/merge.rs b/third_party/rust/tokio/src/stream/merge.rs new file mode 100644 index 0000000000..4850cd40c7 --- /dev/null +++ b/third_party/rust/tokio/src/stream/merge.rs @@ -0,0 +1,97 @@ +use crate::stream::{Fuse, Stream}; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`merge`](super::StreamExt::merge) method. + pub struct Merge<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: Fuse<U>, + // When `true`, poll `a` first, otherwise, `poll` b`. + a_first: bool, + } +} + +impl<T, U> Merge<T, U> { + pub(super) fn new(a: T, b: U) -> Merge<T, U> + where + T: Stream, + U: Stream, + { + Merge { + a: Fuse::new(a), + b: Fuse::new(b), + a_first: true, + } + } +} + +impl<T, U> Stream for Merge<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let me = self.project(); + let a_first = *me.a_first; + + // Toggle the flag + *me.a_first = !a_first; + + if a_first { + poll_next(me.a, me.b, cx) + } else { + poll_next(me.b, me.a, cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (a_lower, a_upper) = self.a.size_hint(); + let (b_lower, b_upper) = self.b.size_hint(); + + let upper = match (a_upper, b_upper) { + (Some(a_upper), Some(b_upper)) => Some(a_upper + b_upper), + _ => None, + }; + + (a_lower + b_lower, upper) + } +} + +fn poll_next<T, U>( + first: Pin<&mut T>, + second: Pin<&mut U>, + cx: &mut Context<'_>, +) -> Poll<Option<T::Item>> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + use Poll::*; + + let mut done = true; + + match first.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + match second.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + if done { + Ready(None) + } else { + Pending + } +} diff --git a/third_party/rust/tokio/src/stream/mod.rs b/third_party/rust/tokio/src/stream/mod.rs new file mode 100644 index 0000000000..307ead5fba --- /dev/null +++ b/third_party/rust/tokio/src/stream/mod.rs @@ -0,0 +1,819 @@ +//! Stream utilities for Tokio. +//! +//! A `Stream` is an asynchronous sequence of values. It can be thought of as an asynchronous version of the standard library's `Iterator` trait. +//! +//! This module provides helpers to work with them. + +mod all; +use all::AllFuture; + +mod any; +use any::AnyFuture; + +mod chain; +use chain::Chain; + +mod collect; +use collect::Collect; +pub use collect::FromStream; + +mod empty; +pub use empty::{empty, Empty}; + +mod filter; +use filter::Filter; + +mod filter_map; +use filter_map::FilterMap; + +mod fold; +use fold::FoldFuture; + +mod fuse; +use fuse::Fuse; + +mod iter; +pub use iter::{iter, Iter}; + +mod map; +use map::Map; + +mod merge; +use merge::Merge; + +mod next; +use next::Next; + +mod once; +pub use once::{once, Once}; + +mod pending; +pub use pending::{pending, Pending}; + +mod stream_map; +pub use stream_map::StreamMap; + +mod skip; +use skip::Skip; + +mod skip_while; +use skip_while::SkipWhile; + +mod try_next; +use try_next::TryNext; + +mod take; +use take::Take; + +mod take_while; +use take_while::TakeWhile; + +cfg_time! { + mod timeout; + use timeout::Timeout; + use std::time::Duration; +} + +pub use futures_core::Stream; + +/// An extension trait for `Stream`s that provides a variety of convenient +/// combinator functions. +pub trait StreamExt: Stream { + /// Consumes and returns the next value in the stream or `None` if the + /// stream is finished. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option<Self::Item>; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Consumes and returns the next item in the stream. If an error is + /// encountered before the next item, the error is returned instead. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn try_next(&mut self) -> Result<Option<T>, E>; + /// ``` + /// + /// This is similar to the [`next`](StreamExt::next) combinator, + /// but returns a [`Result<Option<T>, E>`](Result) rather than + /// an [`Option<Result<T, E>>`](Option), making for easy use + /// with the [`?`](std::ops::Try) operator. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(1))); + /// assert_eq!(stream.try_next().await, Ok(Some(2))); + /// assert_eq!(stream.try_next().await, Err("nope")); + /// # } + /// ``` + fn try_next<T, E>(&mut self) -> TryNext<'_, Self> + where + Self: Stream<Item = Result<T, E>> + Unpin, + { + TryNext::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map<T, F>(self, f: F) -> Map<Self, F> + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } + + /// Combine two streams into one by interleaving the output of both as it + /// is produced. + /// + /// Values are produced from the merged stream in the order they arrive from + /// the two source streams. If both source streams provide values + /// simultaneously, the merge stream alternates between them. This provides + /// some level of fairness. + /// + /// The merged stream completes once **both** source streams complete. When + /// one source stream completes before the other, the merge stream + /// exclusively polls the remaining stream. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::StreamExt; + /// use tokio::sync::mpsc; + /// use tokio::time; + /// + /// use std::time::Duration; + /// + /// # /* + /// #[tokio::main] + /// # */ + /// # #[tokio::main(basic_scheduler)] + /// async fn main() { + /// # time::pause(); + /// let (mut tx1, rx1) = mpsc::channel(10); + /// let (mut tx2, rx2) = mpsc::channel(10); + /// + /// let mut rx = rx1.merge(rx2); + /// + /// tokio::spawn(async move { + /// // Send some values immediately + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// + /// // Let the other task send values + /// time::delay_for(Duration::from_millis(20)).await; + /// + /// tx1.send(4).await.unwrap(); + /// }); + /// + /// tokio::spawn(async move { + /// // Wait for the first task to send values + /// time::delay_for(Duration::from_millis(5)).await; + /// + /// tx2.send(3).await.unwrap(); + /// + /// time::delay_for(Duration::from_millis(25)).await; + /// + /// // Send the final value + /// tx2.send(5).await.unwrap(); + /// }); + /// + /// assert_eq!(1, rx.next().await.unwrap()); + /// assert_eq!(2, rx.next().await.unwrap()); + /// assert_eq!(3, rx.next().await.unwrap()); + /// assert_eq!(4, rx.next().await.unwrap()); + /// assert_eq!(5, rx.next().await.unwrap()); + /// + /// // The merged stream is consumed + /// assert!(rx.next().await.is_none()); + /// } + /// ``` + fn merge<U>(self, other: U) -> Merge<Self, U> + where + U: Stream<Item = Self::Item>, + Self: Sized, + { + Merge::new(self, other) + } + + /// Filters the values produced by this stream according to the provided + /// predicate. + /// + /// As values of this stream are made available, the provided predicate `f` + /// will be run against them. If the predicate + /// resolves to `true`, then the stream will yield the value, but if the + /// predicate resolves to `false`, then the value + /// will be discarded and the next value will be produced. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter(|x| x % 2 == 0); + /// + /// assert_eq!(Some(2), evens.next().await); + /// assert_eq!(Some(4), evens.next().await); + /// assert_eq!(Some(6), evens.next().await); + /// assert_eq!(Some(8), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter<F>(self, f: F) -> Filter<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + Filter::new(self, f) + } + + /// Filters the values produced by this stream while simultaneously mapping + /// them to a different type according to the provided closure. + /// + /// As values of this stream are made available, the provided function will + /// be run on them. If the predicate `f` resolves to + /// [`Some(item)`](Some) then the stream will yield the value `item`, but if + /// it resolves to [`None`] then the next value will be produced. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter_map`] method in the + /// standard library. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter_map(|x| { + /// if x % 2 == 0 { Some(x + 1) } else { None } + /// }); + /// + /// assert_eq!(Some(3), evens.next().await); + /// assert_eq!(Some(5), evens.next().await); + /// assert_eq!(Some(7), evens.next().await); + /// assert_eq!(Some(9), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> + where + F: FnMut(Self::Item) -> Option<T>, + Self: Sized, + { + FilterMap::new(self, f) + } + + /// Creates a stream which ends after the first `None`. + /// + /// After a stream returns `None`, behavior is undefined. Future calls to + /// `poll_next` may or may not return `Some(T)` again or they may panic. + /// `fuse()` adapts a stream, ensuring that after `None` is given, it will + /// return `None` forever. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{Stream, StreamExt}; + /// + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// // a stream which alternates between Some and None + /// struct Alternate { + /// state: i32, + /// } + /// + /// impl Stream for Alternate { + /// type Item = i32; + /// + /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + /// let val = self.state; + /// self.state = self.state + 1; + /// + /// // if it's even, Some(i32), else None + /// if val % 2 == 0 { + /// Poll::Ready(Some(val)) + /// } else { + /// Poll::Ready(None) + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let mut stream = Alternate { state: 0 }; + /// + /// // the stream goes back and forth + /// assert_eq!(stream.next().await, Some(0)); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, None); + /// + /// // however, once it is fused + /// let mut stream = stream.fuse(); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, None); + /// + /// // it will always return `None` after the first time. + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn fuse(self) -> Fuse<Self> + where + Self: Sized, + { + Fuse::new(self) + } + + /// Creates a new stream of at most `n` items of the underlying stream. + /// + /// Once `n` items have been yielded from this stream then it will always + /// return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take(3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take(self, n: usize) -> Take<Self> + where + Self: Sized, + { + Take::new(self, n) + } + + /// Take elements from this stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like `Iterator::take_while`, will take elements from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false it will always return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take_while<F>(self, f: F) -> TakeWhile<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + TakeWhile::new(self, f) + } + + /// Creates a new stream that will skip the `n` first items of the + /// underlying stream. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).skip(7); + /// + /// assert_eq!(Some(8), stream.next().await); + /// assert_eq!(Some(9), stream.next().await); + /// assert_eq!(Some(10), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip(self, n: usize) -> Skip<Self> + where + Self: Sized, + { + Skip::new(self, n) + } + + /// Skip elements from the underlying stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like [`Iterator::skip_while`], will ignore elemets from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false, the rest of the elements will be yielded. + /// + /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); + /// + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(Some(4), stream.next().await); + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + SkipWhile::new(self, f) + } + + /// Tests if every element of the stream matches a predicate. + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all`. If any of them return `false`, it + /// returns `false`. An empty stream returns `true`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).all(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).all(|&x| x > 2).await); + /// # } + /// ``` + /// + /// Stopping at the first `false`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(!iter.all(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&3)); + /// # } + /// ``` + fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AllFuture::new(self, f) + } + + /// Tests if any element of the stream matches a predicate. + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).any(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).any(|&x| x > 5).await); + /// # } + /// ``` + /// + /// Stopping at the first `true`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(iter.any(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&2)); + /// # } + /// ``` + fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AnyFuture::new(self, f) + } + + /// Combine two streams into one by first returning all values from the + /// first stream then all values from the second stream. + /// + /// As long as `self` still has values to emit, no values from `other` are + /// emitted, even if some are ready. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{self, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let one = stream::iter(vec![1, 2, 3]); + /// let two = stream::iter(vec![4, 5, 6]); + /// + /// let mut stream = one.chain(two); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn chain<U>(self, other: U) -> Chain<Self, U> + where + U: Stream<Item = Self::Item>, + Self: Sized, + { + Chain::new(self, other) + } + + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// # Examples + /// Basic usage: + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, *}; + /// + /// let s = stream::iter(vec![1u8, 2, 3]); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # } + /// ``` + fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + + /// Drain stream pushing all emitted values into a collection. + /// + /// `collect` streams all values, awaiting as needed. Values are pushed into + /// a collection. A number of different target collection types are + /// supported, including [`Vec`](std::vec::Vec), + /// [`String`](std::string::String), and [`Bytes`](bytes::Bytes). + /// + /// # `Result` + /// + /// `collect()` can also be used with streams of type `Result<T, E>` where + /// `T: FromStream<_>`. In this case, `collect()` will stream as long as + /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, + /// streaming is terminated and `collect()` returns the `Err`. + /// + /// # Notes + /// + /// `FromStream` is currently a sealed trait. Stabilization is pending + /// enhancements to the Rust langague. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// use tokio::stream::{self, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let doubled: Vec<i32> = + /// stream::iter(vec![1, 2, 3]) + /// .map(|x| x * 2) + /// .collect() + /// .await; + /// + /// assert_eq!(vec![2, 4, 6], doubled); + /// } + /// ``` + /// + /// Collecting a stream of `Result` values + /// + /// ``` + /// use tokio::stream::{self, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// // A stream containing only `Ok` values will be collected + /// let values: Result<Vec<i32>, &str> = + /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) + /// .collect() + /// .await; + /// + /// assert_eq!(Ok(vec![1, 2, 3]), values); + /// + /// // A stream containing `Err` values will return the first error. + /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; + /// + /// let values: Result<Vec<i32>, &str> = + /// stream::iter(results) + /// .collect() + /// .await; + /// + /// assert_eq!(Err("no"), values); + /// } + /// ``` + fn collect<T>(self) -> Collect<Self, T> + where + T: FromStream<Self::Item>, + Self: Sized, + { + Collect::new(self) + } + + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout()` takes a `Duration` that represents the maximum amount of + /// time each element of the stream has to complete before timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio::stream::{self, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let mut int_stream = int_stream.timeout(Duration::from_secs(1)); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout(self, duration: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, duration) + } +} + +impl<St: ?Sized> StreamExt for St where St: Stream {} diff --git a/third_party/rust/tokio/src/stream/next.rs b/third_party/rust/tokio/src/stream/next.rs new file mode 100644 index 0000000000..3909c0c233 --- /dev/null +++ b/third_party/rust/tokio/src/stream/next.rs @@ -0,0 +1,28 @@ +use crate::stream::Stream; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`next`](super::StreamExt::next) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, +} + +impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} + +impl<'a, St: ?Sized> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { stream } + } +} + +impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { + type Output = Option<St::Item>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.stream).poll_next(cx) + } +} diff --git a/third_party/rust/tokio/src/stream/once.rs b/third_party/rust/tokio/src/stream/once.rs new file mode 100644 index 0000000000..04a642f309 --- /dev/null +++ b/third_party/rust/tokio/src/stream/once.rs @@ -0,0 +1,52 @@ +use crate::stream::{self, Iter, Stream}; + +use core::option; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`once`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Once<T> { + iter: Iter<option::IntoIter<T>>, +} + +impl<I> Unpin for Once<I> {} + +/// Creates a stream that emits an element exactly once. +/// +/// The returned stream is immediately ready and emits the provided value once. +/// +/// # Examples +/// +/// ``` +/// use tokio::stream::{self, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// // one is the loneliest number +/// let mut one = stream::once(1); +/// +/// assert_eq!(Some(1), one.next().await); +/// +/// // just one, that's all we get +/// assert_eq!(None, one.next().await); +/// } +/// ``` +pub fn once<T>(value: T) -> Once<T> { + Once { + iter: stream::iter(Some(value).into_iter()), + } +} + +impl<T> Stream for Once<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + Pin::new(&mut self.iter).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.iter.size_hint() + } +} diff --git a/third_party/rust/tokio/src/stream/pending.rs b/third_party/rust/tokio/src/stream/pending.rs new file mode 100644 index 0000000000..2e06a1c261 --- /dev/null +++ b/third_party/rust/tokio/src/stream/pending.rs @@ -0,0 +1,54 @@ +use crate::stream::Stream; + +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Stream for the [`pending`] function. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Pending<T>(PhantomData<T>); + +impl<T> Unpin for Pending<T> {} +unsafe impl<T> Send for Pending<T> {} +unsafe impl<T> Sync for Pending<T> {} + +/// Creates a stream that is never ready +/// +/// The returned stream is never ready. Attempting to call +/// [`next()`](crate::stream::StreamExt::next) will never complete. Use +/// [`stream::empty()`](super::empty()) to obtain a stream that is is +/// immediately empty but returns no values. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ```no_run +/// use tokio::stream::{self, StreamExt}; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut never = stream::pending::<i32>(); +/// +/// // This will never complete +/// never.next().await; +/// +/// unreachable!(); +/// } +/// ``` +pub const fn pending<T>() -> Pending<T> { + Pending(PhantomData) +} + +impl<T> Stream for Pending<T> { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, None) + } +} diff --git a/third_party/rust/tokio/src/stream/skip.rs b/third_party/rust/tokio/src/stream/skip.rs new file mode 100644 index 0000000000..39540cc984 --- /dev/null +++ b/third_party/rust/tokio/src/stream/skip.rs @@ -0,0 +1,63 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip`](super::StreamExt::skip) method. + #[must_use = "streams do nothing unless polled"] + pub struct Skip<St> { + #[pin] + stream: St, + remaining: usize, + } +} + +impl<St> fmt::Debug for Skip<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Skip") + .field("stream", &self.stream) + .finish() + } +} + +impl<St> Skip<St> { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl<St> Stream for Skip<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if self.remaining == 0 { + return Poll::Ready(Some(e)); + } + *self.as_mut().project().remaining -= 1; + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_sub(self.remaining); + let upper = upper.map(|x| x.saturating_sub(self.remaining)); + + (lower, upper) + } +} diff --git a/third_party/rust/tokio/src/stream/skip_while.rs b/third_party/rust/tokio/src/stream/skip_while.rs new file mode 100644 index 0000000000..4e0500701a --- /dev/null +++ b/third_party/rust/tokio/src/stream/skip_while.rs @@ -0,0 +1,73 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct SkipWhile<St, F> { + #[pin] + stream: St, + predicate: Option<F>, + } +} + +impl<St, F> fmt::Debug for SkipWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SkipWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> SkipWhile<St, F> { + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate: Some(predicate), + } + } +} + +impl<St, F> Stream for SkipWhile<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + if let Some(predicate) = this.predicate { + loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + if !(predicate)(&item) { + *this.predicate = None; + return Poll::Ready(Some(item)); + } + } + None => return Poll::Ready(None), + } + } + } else { + this.stream.poll_next(cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + if self.predicate.is_some() { + return (0, upper); + } + + (lower, upper) + } +} diff --git a/third_party/rust/tokio/src/stream/stream_map.rs b/third_party/rust/tokio/src/stream/stream_map.rs new file mode 100644 index 0000000000..2f60ea4dda --- /dev/null +++ b/third_party/rust/tokio/src/stream/stream_map.rs @@ -0,0 +1,503 @@ +use crate::stream::Stream; + +use std::borrow::Borrow; +use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Combine many streams into one, indexing each source stream with a unique +/// key. +/// +/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source +/// streams into a single merged stream that yields values in the order that +/// they arrive from the source streams. However, `StreamMap` has a lot more +/// flexibility in usage patterns. +/// +/// `StreamMap` can: +/// +/// * Merge an arbitrary number of streams. +/// * Track which source stream the value was received from. +/// * Handle inserting and removing streams from the set of managed streams at +/// any point during iteration. +/// +/// All source streams held by `StreamMap` are indexed using a key. This key is +/// included with the value when a source stream yields a value. The key is also +/// used to remove the stream from the `StreamMap` before the stream has +/// completed streaming. +/// +/// # `Unpin` +/// +/// Because the `StreamMap` API moves streams during runtime, both streams and +/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a +/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to +/// pin the stream in the heap. +/// +/// # Implementation +/// +/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this +/// internal implementation detail will persist in future versions, but it is +/// important to know the runtime implications. In general, `StreamMap` works +/// best with a "smallish" number of streams as all entries are scanned on +/// insert, remove, and polling. In cases where a large number of streams need +/// to be merged, it may be advisable to use tasks sending values on a shared +/// [`mpsc`] channel. +/// +/// [`StreamExt::merge`]: crate::stream::StreamExt::merge +/// [`mpsc`]: crate::sync::mpsc +/// [`pin!`]: macro@pin +/// [`Box::pin`]: std::boxed::Box::pin +/// +/// # Examples +/// +/// Merging two streams, then remove them after receiving the first value +/// +/// ``` +/// use tokio::stream::{StreamExt, StreamMap}; +/// use tokio::sync::mpsc; +/// +/// #[tokio::main] +/// async fn main() { +/// let (mut tx1, rx1) = mpsc::channel(10); +/// let (mut tx2, rx2) = mpsc::channel(10); +/// +/// tokio::spawn(async move { +/// tx1.send(1).await.unwrap(); +/// +/// // This value will never be received. The send may or may not return +/// // `Err` depending on if the remote end closed first or not. +/// let _ = tx1.send(2).await; +/// }); +/// +/// tokio::spawn(async move { +/// tx2.send(3).await.unwrap(); +/// let _ = tx2.send(4).await; +/// }); +/// +/// let mut map = StreamMap::new(); +/// +/// // Insert both streams +/// map.insert("one", rx1); +/// map.insert("two", rx2); +/// +/// // Read twice +/// for _ in 0..2 { +/// let (key, val) = map.next().await.unwrap(); +/// +/// if key == "one" { +/// assert_eq!(val, 1); +/// } else { +/// assert_eq!(val, 3); +/// } +/// +/// // Remove the stream to prevent reading the next value +/// map.remove(key); +/// } +/// } +/// ``` +/// +/// This example models a read-only client to a chat system with channels. The +/// client sends commands to join and leave channels. `StreamMap` is used to +/// manage active channel subscriptions. +/// +/// For simplicity, messages are displayed with `println!`, but they could be +/// sent to the client over a socket. +/// +/// ```no_run +/// use tokio::stream::{Stream, StreamExt, StreamMap}; +/// +/// enum Command { +/// Join(String), +/// Leave(String), +/// } +/// +/// fn commands() -> impl Stream<Item = Command> { +/// // Streams in user commands by parsing `stdin`. +/// # tokio::stream::pending() +/// } +/// +/// // Join a channel, returns a stream of messages received on the channel. +/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { +/// // left as an exercise to the reader +/// # tokio::stream::pending() +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let mut channels = StreamMap::new(); +/// +/// // Input commands (join / leave channels). +/// let cmds = commands(); +/// tokio::pin!(cmds); +/// +/// loop { +/// tokio::select! { +/// Some(cmd) = cmds.next() => { +/// match cmd { +/// Command::Join(chan) => { +/// // Join the channel and add it to the `channels` +/// // stream map +/// let msgs = join(&chan); +/// channels.insert(chan, msgs); +/// } +/// Command::Leave(chan) => { +/// channels.remove(&chan); +/// } +/// } +/// } +/// Some((chan, msg)) = channels.next() => { +/// // Received a message, display it on stdout with the channel +/// // it originated from. +/// println!("{}: {}", chan, msg); +/// } +/// // Both the `commands` stream and the `channels` stream are +/// // complete. There is no more work to do, so leave the loop. +/// else => break, +/// } +/// } +/// } +/// ``` +#[derive(Debug, Default)] +pub struct StreamMap<K, V> { + /// Streams stored in the map + entries: Vec<(K, V)>, +} + +impl<K, V> StreamMap<K, V> { + /// Creates an empty `StreamMap`. + /// + /// The stream map is initially created with a capacity of `0`, so it will + /// not allocate until it is first inserted into. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); + /// ``` + pub fn new() -> StreamMap<K, V> { + StreamMap { entries: vec![] } + } + + /// Creates an empty `StreamMap` with the specified capacity. + /// + /// The stream map will be able to hold at least `capacity` elements without + /// reallocating. If `capacity` is 0, the stream map will not allocate. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); + /// ``` + pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { + StreamMap { + entries: Vec::with_capacity(capacity), + } + } + + /// Returns an iterator visiting all keys in arbitrary order. + /// + /// The iterator element type is &'a K. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for key in map.keys() { + /// println!("{}", key); + /// } + /// ``` + pub fn keys(&self) -> impl Iterator<Item = &K> { + self.entries.iter().map(|(k, _)| k) + } + + /// An iterator visiting all values in arbitrary order. + /// + /// The iterator element type is &'a V. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for stream in map.values() { + /// println!("{:?}", stream); + /// } + /// ``` + pub fn values(&self) -> impl Iterator<Item = &V> { + self.entries.iter().map(|(_, v)| v) + } + + /// An iterator visiting all values mutably in arbitrary order. + /// + /// The iterator element type is &'a mut V. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// map.insert("a", pending::<i32>()); + /// map.insert("b", pending()); + /// map.insert("c", pending()); + /// + /// for stream in map.values_mut() { + /// println!("{:?}", stream); + /// } + /// ``` + pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { + self.entries.iter_mut().map(|(_, v)| v) + } + + /// Returns the number of streams the map can hold without reallocating. + /// + /// This number is a lower bound; the `StreamMap` might be able to hold + /// more, but is guaranteed to be able to hold at least this many. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, Pending}; + /// + /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); + /// assert!(map.capacity() >= 100); + /// ``` + pub fn capacity(&self) -> usize { + self.entries.capacity() + } + + /// Returns the number of streams in the map. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// assert_eq!(a.len(), 0); + /// a.insert(1, pending::<i32>()); + /// assert_eq!(a.len(), 1); + /// ``` + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Returns `true` if the map contains no elements. + /// + /// # Examples + /// + /// ``` + /// use std::collections::HashMap; + /// + /// let mut a = HashMap::new(); + /// assert!(a.is_empty()); + /// a.insert(1, "a"); + /// assert!(!a.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Clears the map, removing all key-stream pairs. Keeps the allocated + /// memory for reuse. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut a = StreamMap::new(); + /// a.insert(1, pending::<i32>()); + /// a.clear(); + /// assert!(a.is_empty()); + /// ``` + pub fn clear(&mut self) { + self.entries.clear(); + } + + /// Insert a key-stream pair into the map. + /// + /// If the map did not have this key present, `None` is returned. + /// + /// If the map did have this key present, the new `stream` replaces the old + /// one and the old stream is returned. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// + /// assert!(map.insert(37, pending::<i32>()).is_none()); + /// assert!(!map.is_empty()); + /// + /// map.insert(37, pending()); + /// assert!(map.insert(37, pending()).is_some()); + /// ``` + pub fn insert(&mut self, k: K, stream: V) -> Option<V> + where + K: Hash + Eq, + { + let ret = self.remove(&k); + self.entries.push((k, stream)); + + ret + } + + /// Removes a key from the map, returning the stream at the key if the key was previously in the map. + /// + /// The key may be any borrowed form of the map's key type, but `Hash` and + /// `Eq` on the borrowed form must match those for the key type. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// map.insert(1, pending::<i32>()); + /// assert!(map.remove(&1).is_some()); + /// assert!(map.remove(&1).is_none()); + /// ``` + pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> + where + K: Borrow<Q>, + Q: Hash + Eq, + { + for i in 0..self.entries.len() { + if self.entries[i].0.borrow() == k { + return Some(self.entries.swap_remove(i).1); + } + } + + None + } + + /// Returns `true` if the map contains a stream for the specified key. + /// + /// The key may be any borrowed form of the map's key type, but `Hash` and + /// `Eq` on the borrowed form must match those for the key type. + /// + /// # Examples + /// + /// ``` + /// use tokio::stream::{StreamMap, pending}; + /// + /// let mut map = StreamMap::new(); + /// map.insert(1, pending::<i32>()); + /// assert_eq!(map.contains_key(&1), true); + /// assert_eq!(map.contains_key(&2), false); + /// ``` + pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool + where + K: Borrow<Q>, + Q: Hash + Eq, + { + for i in 0..self.entries.len() { + if self.entries[i].0.borrow() == k { + return true; + } + } + + false + } +} + +impl<K, V> StreamMap<K, V> +where + K: Unpin, + V: Stream + Unpin, +{ + /// Polls the next value, includes the vec entry index + fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { + use Poll::*; + + let start = crate::util::thread_rng_n(self.entries.len() as u32) as usize; + let mut idx = start; + + for _ in 0..self.entries.len() { + let (_, stream) = &mut self.entries[idx]; + + match Pin::new(stream).poll_next(cx) { + Ready(Some(val)) => return Ready(Some((idx, val))), + Ready(None) => { + // Remove the entry + self.entries.swap_remove(idx); + + // Check if this was the last entry, if so the cursor needs + // to wrap + if idx == self.entries.len() { + idx = 0; + } else if idx < start && start <= self.entries.len() { + // The stream being swapped into the current index has + // already been polled, so skip it. + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + Pending => { + idx = idx.wrapping_add(1) % self.entries.len(); + } + } + } + + // If the map is empty, then the stream is complete. + if self.entries.is_empty() { + Ready(None) + } else { + Pending + } + } +} + +impl<K, V> Stream for StreamMap<K, V> +where + K: Clone + Unpin, + V: Stream + Unpin, +{ + type Item = (K, V::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { + let key = self.entries[idx].0.clone(); + Poll::Ready(Some((key, val))) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let mut ret = (0, Some(0)); + + for (_, stream) in &self.entries { + let hint = stream.size_hint(); + + ret.0 += hint.0; + + match (ret.1, hint.1) { + (Some(a), Some(b)) => ret.1 = Some(a + b), + (Some(_), None) => ret.1 = None, + _ => {} + } + } + + ret + } +} diff --git a/third_party/rust/tokio/src/stream/take.rs b/third_party/rust/tokio/src/stream/take.rs new file mode 100644 index 0000000000..a92430b77c --- /dev/null +++ b/third_party/rust/tokio/src/stream/take.rs @@ -0,0 +1,76 @@ +use crate::stream::Stream; + +use core::cmp; +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`take`](super::StreamExt::take) method. + #[must_use = "streams do nothing unless polled"] + pub struct Take<St> { + #[pin] + stream: St, + remaining: usize, + } +} + +impl<St> fmt::Debug for Take<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Take") + .field("stream", &self.stream) + .finish() + } +} + +impl<St> Take<St> { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl<St> Stream for Take<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if *self.as_mut().project().remaining > 0 { + self.as_mut().project().stream.poll_next(cx).map(|ready| { + match &ready { + Some(_) => { + *self.as_mut().project().remaining -= 1; + } + None => { + *self.as_mut().project().remaining = 0; + } + } + ready + }) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.remaining == 0 { + return (0, Some(0)); + } + + let (lower, upper) = self.stream.size_hint(); + + let lower = cmp::min(lower, self.remaining as usize); + + let upper = match upper { + Some(x) if x < self.remaining as usize => Some(x), + _ => Some(self.remaining as usize), + }; + + (lower, upper) + } +} diff --git a/third_party/rust/tokio/src/stream/take_while.rs b/third_party/rust/tokio/src/stream/take_while.rs new file mode 100644 index 0000000000..cf1e160613 --- /dev/null +++ b/third_party/rust/tokio/src/stream/take_while.rs @@ -0,0 +1,79 @@ +use crate::stream::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`take_while`](super::StreamExt::take_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct TakeWhile<St, F> { + #[pin] + stream: St, + predicate: F, + done: bool, + } +} + +impl<St, F> fmt::Debug for TakeWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TakeWhile") + .field("stream", &self.stream) + .field("done", &self.done) + .finish() + } +} + +impl<St, F> TakeWhile<St, F> { + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate, + done: false, + } + } +} + +impl<St, F> Stream for TakeWhile<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if !*self.as_mut().project().done { + self.as_mut().project().stream.poll_next(cx).map(|ready| { + let ready = ready.and_then(|item| { + if !(self.as_mut().project().predicate)(&item) { + None + } else { + Some(item) + } + }); + + if ready.is_none() { + *self.as_mut().project().done = true; + } + + ready + }) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.done { + return (0, Some(0)); + } + + let (_, upper) = self.stream.size_hint(); + + (0, upper) + } +} diff --git a/third_party/rust/tokio/src/stream/timeout.rs b/third_party/rust/tokio/src/stream/timeout.rs new file mode 100644 index 0000000000..b8a2024f6a --- /dev/null +++ b/third_party/rust/tokio/src/stream/timeout.rs @@ -0,0 +1,65 @@ +use crate::stream::{Fuse, Stream}; +use crate::time::{Delay, Elapsed, Instant}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct Timeout<S> { + #[pin] + stream: Fuse<S>, + deadline: Delay, + duration: Duration, + poll_deadline: bool, + } +} + +impl<S: Stream> Timeout<S> { + pub(super) fn new(stream: S, duration: Duration) -> Self { + let next = Instant::now() + duration; + let deadline = Delay::new_timeout(next, duration); + + Timeout { + stream: Fuse::new(stream), + deadline, + duration, + poll_deadline: true, + } + } +} + +impl<S: Stream> Stream for Timeout<S> { + type Item = Result<S::Item, Elapsed>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + match self.as_mut().project().stream.poll_next(cx) { + Poll::Ready(v) => { + if v.is_some() { + let next = Instant::now() + self.duration; + self.as_mut().project().deadline.reset(next); + *self.as_mut().project().poll_deadline = true; + } + return Poll::Ready(v.map(Ok)); + } + Poll::Pending => {} + }; + + if self.poll_deadline { + ready!(Pin::new(self.as_mut().project().deadline).poll(cx)); + *self.as_mut().project().poll_deadline = false; + return Poll::Ready(Some(Err(Elapsed::new()))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} diff --git a/third_party/rust/tokio/src/stream/try_next.rs b/third_party/rust/tokio/src/stream/try_next.rs new file mode 100644 index 0000000000..59e0eb1a41 --- /dev/null +++ b/third_party/rust/tokio/src/stream/try_next.rs @@ -0,0 +1,30 @@ +use crate::stream::{Next, Stream}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// Future for the [`try_next`](super::StreamExt::try_next) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct TryNext<'a, St: ?Sized> { + inner: Next<'a, St>, +} + +impl<St: ?Sized + Unpin> Unpin for TryNext<'_, St> {} + +impl<'a, St: ?Sized> TryNext<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Self { + inner: Next::new(stream), + } + } +} + +impl<T, E, St: ?Sized + Stream<Item = Result<T, E>> + Unpin> Future for TryNext<'_, St> { + type Output = Result<Option<T>, E>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.inner).poll(cx).map(Option::transpose) + } +} |