diff options
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext')
21 files changed, 1566 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext/all.rs b/third_party/rust/tokio-stream/src/stream_ext/all.rs new file mode 100644 index 0000000000..b4dbc1e97c --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/all.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`all`](super::StreamExt::all) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct AllFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized, F> AllFuture<'a, St, F> { + pub(super) fn new(stream: &'a mut St, f: F) -> Self { + Self { + stream, + f, + _pin: PhantomPinned, + } + } +} + +impl<St, F> Future for AllFuture<'_, St, F> +where + St: ?Sized + Stream + Unpin, + F: FnMut(St::Item) -> bool, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + let mut stream = Pin::new(me.stream); + + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if !(me.f)(v) { + return Poll::Ready(false); + } + } + None => return Poll::Ready(true), + } + } + + cx.waker().wake_by_ref(); + Poll::Pending + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/any.rs b/third_party/rust/tokio-stream/src/stream_ext/any.rs new file mode 100644 index 0000000000..31394f249b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/any.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`any`](super::StreamExt::any) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct AnyFuture<'a, St: ?Sized, F> { + stream: &'a mut St, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> { + pub(super) fn new(stream: &'a mut St, f: F) -> Self { + Self { + stream, + f, + _pin: PhantomPinned, + } + } +} + +impl<St, F> Future for AnyFuture<'_, St, F> +where + St: ?Sized + Stream + Unpin, + F: FnMut(St::Item) -> bool, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + let mut stream = Pin::new(me.stream); + + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if (me.f)(v) { + return Poll::Ready(true); + } + } + None => return Poll::Ready(false), + } + } + + cx.waker().wake_by_ref(); + Poll::Pending + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/chain.rs b/third_party/rust/tokio-stream/src/stream_ext/chain.rs new file mode 100644 index 0000000000..bd64f33ce4 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/chain.rs @@ -0,0 +1,50 @@ +use crate::stream_ext::Fuse; +use crate::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`chain`](super::StreamExt::chain) method. + pub struct Chain<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: U, + } +} + +impl<T, U> Chain<T, U> { + pub(super) fn new(a: T, b: U) -> Chain<T, U> + where + T: Stream, + U: Stream, + { + Chain { a: Fuse::new(a), b } + } +} + +impl<T, U> Stream for Chain<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + use Poll::Ready; + + let me = self.project(); + + if let Some(v) = ready!(me.a.poll_next(cx)) { + return Ready(Some(v)); + } + + me.b.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs new file mode 100644 index 0000000000..48acd9328b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs @@ -0,0 +1,86 @@ +use crate::stream_ext::Fuse; +use crate::Stream; +use tokio::time::{sleep, Sleep}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct ChunksTimeout<S: Stream> { + #[pin] + stream: Fuse<S>, + #[pin] + deadline: Option<Sleep>, + duration: Duration, + items: Vec<S::Item>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl<S: Stream> ChunksTimeout<S> { + pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self { + ChunksTimeout { + stream: Fuse::new(stream), + deadline: None, + duration, + items: Vec::with_capacity(max_size), + cap: max_size, + } + } +} + +impl<S: Stream> Stream for ChunksTimeout<S> { + type Item = Vec<S::Item>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.as_mut().project(); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(Some(item)) => { + if me.items.is_empty() { + me.deadline.set(Some(sleep(*me.duration))); + me.items.reserve_exact(*me.cap); + } + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(std::mem::take(me.items))); + } + } + Poll::Ready(None) => { + // Returning Some here is only correct because we fuse the inner stream. + let last = if me.items.is_empty() { + None + } else { + Some(std::mem::take(me.items)) + }; + + return Poll::Ready(last); + } + } + } + + if !me.items.is_empty() { + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } + return Poll::Ready(Some(std::mem::take(me.items))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = (lower / self.cap).saturating_add(chunk_len); + let upper = upper.and_then(|x| x.checked_add(chunk_len)); + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/collect.rs b/third_party/rust/tokio-stream/src/stream_ext/collect.rs new file mode 100644 index 0000000000..8548b74556 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/collect.rs @@ -0,0 +1,229 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +// Do not export this struct until `FromStream` can be unsealed. +pin_project! { + /// Future returned by the [`collect`](super::StreamExt::collect) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct Collect<T, U> + where + T: Stream, + U: FromStream<T::Item>, + { + #[pin] + stream: T, + collection: U::InternalCollection, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +/// Convert from a [`Stream`](crate::Stream). +/// +/// This trait is not intended to be used directly. Instead, call +/// [`StreamExt::collect()`](super::StreamExt::collect). +/// +/// # Implementing +/// +/// Currently, this trait may not be implemented by third parties. The trait is +/// sealed in order to make changes in the future. Stabilization is pending +/// enhancements to the Rust language. +pub trait FromStream<T>: sealed::FromStreamPriv<T> {} + +impl<T, U> Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + pub(super) fn new(stream: T) -> Collect<T, U> { + let (lower, upper) = stream.size_hint(); + let collection = U::initialize(sealed::Internal, lower, upper); + + Collect { + stream, + collection, + _pin: PhantomPinned, + } + } +} + +impl<T, U> Future for Collect<T, U> +where + T: Stream, + U: FromStream<T::Item>, +{ + type Output = U; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> { + use Poll::Ready; + + loop { + let me = self.as_mut().project(); + + let item = match ready!(me.stream.poll_next(cx)) { + Some(item) => item, + None => { + return Ready(U::finalize(sealed::Internal, me.collection)); + } + }; + + if !U::extend(sealed::Internal, me.collection, item) { + return Ready(U::finalize(sealed::Internal, me.collection)); + } + } + } +} + +// ===== FromStream implementations + +impl FromStream<()> for () {} + +impl sealed::FromStreamPriv<()> for () { + type InternalCollection = (); + + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {} + + fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool { + true + } + + fn finalize(_: sealed::Internal, _collection: &mut ()) {} +} + +impl<T: AsRef<str>> FromStream<T> for String {} + +impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { + type InternalCollection = String; + + fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String { + String::new() + } + + fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool { + collection.push_str(item.as_ref()); + true + } + + fn finalize(_: sealed::Internal, collection: &mut String) -> String { + mem::take(collection) + } +} + +impl<T> FromStream<T> for Vec<T> {} + +impl<T> sealed::FromStreamPriv<T> for Vec<T> { + type InternalCollection = Vec<T>; + + fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> { + Vec::with_capacity(lower) + } + + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { + mem::take(collection) + } +} + +impl<T> FromStream<T> for Box<[T]> {} + +impl<T> sealed::FromStreamPriv<T> for Box<[T]> { + type InternalCollection = Vec<T>; + + fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> { + <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper) + } + + fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool { + <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item) + } + + fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> { + <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection) + .into_boxed_slice() + } +} + +impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {} + +impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E> +where + U: FromStream<T>, +{ + type InternalCollection = Result<U::InternalCollection, E>; + + fn initialize( + _: sealed::Internal, + lower: usize, + upper: Option<usize>, + ) -> Result<U::InternalCollection, E> { + Ok(U::initialize(sealed::Internal, lower, upper)) + } + + fn extend( + _: sealed::Internal, + collection: &mut Self::InternalCollection, + item: Result<T, E>, + ) -> bool { + assert!(collection.is_ok()); + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(sealed::Internal, collection, item) + } + Err(err) => { + *collection = Err(err); + false + } + } + } + + fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> { + if let Ok(collection) = collection.as_mut() { + Ok(U::finalize(sealed::Internal, collection)) + } else { + let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); + + Err(res.map(drop).unwrap_err()) + } + } +} + +pub(crate) mod sealed { + #[doc(hidden)] + pub trait FromStreamPriv<T> { + /// Intermediate type used during collection process + /// + /// The name of this type is internal and cannot be relied upon. + type InternalCollection; + + /// Initialize the collection + fn initialize( + internal: Internal, + lower: usize, + upper: Option<usize>, + ) -> Self::InternalCollection; + + /// Extend the collection with the received item + /// + /// Return `true` to continue streaming, `false` complete collection. + fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool; + + /// Finalize collection into target type. + fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self; + } + + #[allow(missing_debug_implementations)] + pub struct Internal; +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter.rs b/third_party/rust/tokio-stream/src/stream_ext/filter.rs new file mode 100644 index 0000000000..f3dd8716b4 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/filter.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`filter`](super::StreamExt::filter) method. + #[must_use = "streams do nothing unless polled"] + pub struct Filter<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Filter<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Filter") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> Filter<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St, F> Stream for Filter<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if (self.as_mut().project().f)(&e) { + return Poll::Ready(Some(e)); + } + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs new file mode 100644 index 0000000000..fe604a6f4b --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/filter_map.rs @@ -0,0 +1,58 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`filter_map`](super::StreamExt::filter_map) method. + #[must_use = "streams do nothing unless polled"] + pub struct FilterMap<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for FilterMap<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FilterMap") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> FilterMap<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f } + } +} + +impl<St, F, T> Stream for FilterMap<St, F> +where + St: Stream, + F: FnMut(St::Item) -> Option<T>, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if let Some(e) = (self.as_mut().project().f)(e) { + return Poll::Ready(Some(e)); + } + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/fold.rs b/third_party/rust/tokio-stream/src/stream_ext/fold.rs new file mode 100644 index 0000000000..e2e97d8f37 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/fold.rs @@ -0,0 +1,57 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future returned by the [`fold`](super::StreamExt::fold) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct FoldFuture<St, B, F> { + #[pin] + stream: St, + acc: Option<B>, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<St, B, F> FoldFuture<St, B, F> { + pub(super) fn new(stream: St, init: B, f: F) -> Self { + Self { + stream, + acc: Some(init), + f, + _pin: PhantomPinned, + } + } +} + +impl<St, B, F> Future for FoldFuture<St, B, F> +where + St: Stream, + F: FnMut(B, St::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + let next = ready!(me.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let old = me.acc.take().unwrap(); + let new = (me.f)(old, v); + *me.acc = Some(new); + } + None => return Poll::Ready(me.acc.take().unwrap()), + } + } + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/fuse.rs b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs new file mode 100644 index 0000000000..2500641d95 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/fuse.rs @@ -0,0 +1,53 @@ +use crate::Stream; + +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Stream returned by [`fuse()`][super::StreamExt::fuse]. + #[derive(Debug)] + pub struct Fuse<T> { + #[pin] + stream: Option<T>, + } +} + +impl<T> Fuse<T> +where + T: Stream, +{ + pub(crate) fn new(stream: T) -> Fuse<T> { + Fuse { + stream: Some(stream), + } + } +} + +impl<T> Stream for Fuse<T> +where + T: Stream, +{ + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let res = match Option::as_pin_mut(self.as_mut().project().stream) { + Some(stream) => ready!(stream.poll_next(cx)), + None => return Poll::Ready(None), + }; + + if res.is_none() { + // Do not poll the stream anymore + self.as_mut().project().stream.set(None); + } + + Poll::Ready(res) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self.stream { + Some(ref stream) => stream.size_hint(), + None => (0, Some(0)), + } + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/map.rs b/third_party/rust/tokio-stream/src/stream_ext/map.rs new file mode 100644 index 0000000000..e6b47cd258 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/map.rs @@ -0,0 +1,51 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for Map<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Map").field("stream", &self.stream).finish() + } +} + +impl<St, F> Map<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Map { stream, f } + } +} + +impl<St, F, T> Stream for Map<St, F> +where + St: Stream, + F: FnMut(St::Item) -> T, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.as_mut() + .project() + .stream + .poll_next(cx) + .map(|opt| opt.map(|x| (self.as_mut().project().f)(x))) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/map_while.rs b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs new file mode 100644 index 0000000000..d4fd825656 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/map_while.rs @@ -0,0 +1,52 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`map_while`](super::StreamExt::map_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct MapWhile<St, F> { + #[pin] + stream: St, + f: F, + } +} + +impl<St, F> fmt::Debug for MapWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MapWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> MapWhile<St, F> { + pub(super) fn new(stream: St, f: F) -> Self { + MapWhile { stream, f } + } +} + +impl<St, F, T> Stream for MapWhile<St, F> +where + St: Stream, + F: FnMut(St::Item) -> Option<T>, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let me = self.project(); + let f = me.f; + me.stream.poll_next(cx).map(|opt| opt.and_then(f)) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (_, upper) = self.stream.size_hint(); + (0, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/merge.rs b/third_party/rust/tokio-stream/src/stream_ext/merge.rs new file mode 100644 index 0000000000..9d5123c85a --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/merge.rs @@ -0,0 +1,90 @@ +use crate::stream_ext::Fuse; +use crate::Stream; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream returned by the [`merge`](super::StreamExt::merge) method. + pub struct Merge<T, U> { + #[pin] + a: Fuse<T>, + #[pin] + b: Fuse<U>, + // When `true`, poll `a` first, otherwise, `poll` b`. + a_first: bool, + } +} + +impl<T, U> Merge<T, U> { + pub(super) fn new(a: T, b: U) -> Merge<T, U> + where + T: Stream, + U: Stream, + { + Merge { + a: Fuse::new(a), + b: Fuse::new(b), + a_first: true, + } + } +} + +impl<T, U> Stream for Merge<T, U> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + let me = self.project(); + let a_first = *me.a_first; + + // Toggle the flag + *me.a_first = !a_first; + + if a_first { + poll_next(me.a, me.b, cx) + } else { + poll_next(me.b, me.a, cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + super::merge_size_hints(self.a.size_hint(), self.b.size_hint()) + } +} + +fn poll_next<T, U>( + first: Pin<&mut T>, + second: Pin<&mut U>, + cx: &mut Context<'_>, +) -> Poll<Option<T::Item>> +where + T: Stream, + U: Stream<Item = T::Item>, +{ + use Poll::*; + + let mut done = true; + + match first.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + match second.poll_next(cx) { + Ready(Some(val)) => return Ready(Some(val)), + Ready(None) => {} + Pending => done = false, + } + + if done { + Ready(None) + } else { + Pending + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/next.rs b/third_party/rust/tokio-stream/src/stream_ext/next.rs new file mode 100644 index 0000000000..706069fa6e --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/next.rs @@ -0,0 +1,44 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`next`](super::StreamExt::next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + /// + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Next<'a, St: ?Sized> { + stream: &'a mut St, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized> Next<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Next { + stream, + _pin: PhantomPinned, + } + } +} + +impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { + type Output = Option<St::Item>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + Pin::new(me.stream).poll_next(cx) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip.rs b/third_party/rust/tokio-stream/src/stream_ext/skip.rs new file mode 100644 index 0000000000..80a0a0aff0 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/skip.rs @@ -0,0 +1,63 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip`](super::StreamExt::skip) method. + #[must_use = "streams do nothing unless polled"] + pub struct Skip<St> { + #[pin] + stream: St, + remaining: usize, + } +} + +impl<St> fmt::Debug for Skip<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Skip") + .field("stream", &self.stream) + .finish() + } +} + +impl<St> Skip<St> { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl<St> Stream for Skip<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + loop { + match ready!(self.as_mut().project().stream.poll_next(cx)) { + Some(e) => { + if self.remaining == 0 { + return Poll::Ready(Some(e)); + } + *self.as_mut().project().remaining -= 1; + } + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_sub(self.remaining); + let upper = upper.map(|x| x.saturating_sub(self.remaining)); + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs new file mode 100644 index 0000000000..985a92666e --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/skip_while.rs @@ -0,0 +1,73 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct SkipWhile<St, F> { + #[pin] + stream: St, + predicate: Option<F>, + } +} + +impl<St, F> fmt::Debug for SkipWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SkipWhile") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, F> SkipWhile<St, F> { + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate: Some(predicate), + } + } +} + +impl<St, F> Stream for SkipWhile<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + if let Some(predicate) = this.predicate { + loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + if !(predicate)(&item) { + *this.predicate = None; + return Poll::Ready(Some(item)); + } + } + None => return Poll::Ready(None), + } + } + } else { + this.stream.poll_next(cx) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + if self.predicate.is_some() { + return (0, upper); + } + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/take.rs b/third_party/rust/tokio-stream/src/stream_ext/take.rs new file mode 100644 index 0000000000..c75648f606 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/take.rs @@ -0,0 +1,76 @@ +use crate::Stream; + +use core::cmp; +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`take`](super::StreamExt::take) method. + #[must_use = "streams do nothing unless polled"] + pub struct Take<St> { + #[pin] + stream: St, + remaining: usize, + } +} + +impl<St> fmt::Debug for Take<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Take") + .field("stream", &self.stream) + .finish() + } +} + +impl<St> Take<St> { + pub(super) fn new(stream: St, remaining: usize) -> Self { + Self { stream, remaining } + } +} + +impl<St> Stream for Take<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if *self.as_mut().project().remaining > 0 { + self.as_mut().project().stream.poll_next(cx).map(|ready| { + match &ready { + Some(_) => { + *self.as_mut().project().remaining -= 1; + } + None => { + *self.as_mut().project().remaining = 0; + } + } + ready + }) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.remaining == 0 { + return (0, Some(0)); + } + + let (lower, upper) = self.stream.size_hint(); + + let lower = cmp::min(lower, self.remaining as usize); + + let upper = match upper { + Some(x) if x < self.remaining as usize => Some(x), + _ => Some(self.remaining as usize), + }; + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/take_while.rs b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs new file mode 100644 index 0000000000..5ce4dd98a9 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/take_while.rs @@ -0,0 +1,79 @@ +use crate::Stream; + +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`take_while`](super::StreamExt::take_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct TakeWhile<St, F> { + #[pin] + stream: St, + predicate: F, + done: bool, + } +} + +impl<St, F> fmt::Debug for TakeWhile<St, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TakeWhile") + .field("stream", &self.stream) + .field("done", &self.done) + .finish() + } +} + +impl<St, F> TakeWhile<St, F> { + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate, + done: false, + } + } +} + +impl<St, F> Stream for TakeWhile<St, F> +where + St: Stream, + F: FnMut(&St::Item) -> bool, +{ + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if !*self.as_mut().project().done { + self.as_mut().project().stream.poll_next(cx).map(|ready| { + let ready = ready.and_then(|item| { + if !(self.as_mut().project().predicate)(&item) { + None + } else { + Some(item) + } + }); + + if ready.is_none() { + *self.as_mut().project().done = true; + } + + ready + }) + } else { + Poll::Ready(None) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.done { + return (0, Some(0)); + } + + let (_, upper) = self.stream.size_hint(); + + (0, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/then.rs b/third_party/rust/tokio-stream/src/stream_ext/then.rs new file mode 100644 index 0000000000..cc7caa721e --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/then.rs @@ -0,0 +1,83 @@ +use crate::Stream; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`then`](super::StreamExt::then) method. + #[must_use = "streams do nothing unless polled"] + pub struct Then<St, Fut, F> { + #[pin] + stream: St, + #[pin] + future: Option<Fut>, + f: F, + } +} + +impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Then") + .field("stream", &self.stream) + .finish() + } +} + +impl<St, Fut, F> Then<St, Fut, F> { + pub(super) fn new(stream: St, f: F) -> Self { + Then { + stream, + future: None, + f, + } + } +} + +impl<St, F, Fut> Stream for Then<St, Fut, F> +where + St: Stream, + Fut: Future, + F: FnMut(St::Item) -> Fut, +{ + type Item = Fut::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> { + let mut me = self.project(); + + loop { + if let Some(future) = me.future.as_mut().as_pin_mut() { + match future.poll(cx) { + Poll::Ready(item) => { + me.future.set(None); + return Poll::Ready(Some(item)); + } + Poll::Pending => return Poll::Pending, + } + } + + match me.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => { + me.future.set(Some((me.f)(item))); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let future_len = usize::from(self.future.is_some()); + let (lower, upper) = self.stream.size_hint(); + + let lower = lower.saturating_add(future_len); + let upper = upper.and_then(|upper| upper.checked_add(future_len)); + + (lower, upper) + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/throttle.rs b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs new file mode 100644 index 0000000000..50001392ee --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs @@ -0,0 +1,96 @@ +//! Slow down a stream by enforcing a delay between items. + +use crate::Stream; +use tokio::time::{Duration, Instant, Sleep}; + +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +use pin_project_lite::pin_project; + +pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T> +where + T: Stream, +{ + Throttle { + delay: tokio::time::sleep_until(Instant::now() + duration), + duration, + has_delayed: true, + stream, + } +} + +pin_project! { + /// Stream for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to + /// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Throttle<T> { + #[pin] + delay: Sleep, + duration: Duration, + + // Set to true when `delay` has returned ready, but `stream` hasn't. + has_delayed: bool, + + // The stream to throttle + #[pin] + stream: T, + } +} + +impl<T> Throttle<T> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this combinator + /// is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the stream + /// which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so care + /// should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.stream + } +} + +impl<T: Stream> Stream for Throttle<T> { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.project(); + let dur = *me.duration; + + if !*me.has_delayed && !is_zero(dur) { + ready!(me.delay.as_mut().poll(cx)); + *me.has_delayed = true; + } + + let value = ready!(me.stream.poll_next(cx)); + + if value.is_some() { + if !is_zero(dur) { + me.delay.reset(Instant::now() + dur); + } + + *me.has_delayed = false; + } + + Poll::Ready(value) + } +} + +fn is_zero(dur: Duration) -> bool { + dur == Duration::from_millis(0) +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs new file mode 100644 index 0000000000..a440d203ec --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/timeout.rs @@ -0,0 +1,107 @@ +use crate::stream_ext::Fuse; +use crate::Stream; +use tokio::time::{Instant, Sleep}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::fmt; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct Timeout<S> { + #[pin] + stream: Fuse<S>, + #[pin] + deadline: Sleep, + duration: Duration, + poll_deadline: bool, + } +} + +/// Error returned by `Timeout`. +#[derive(Debug, PartialEq, Eq)] +pub struct Elapsed(()); + +impl<S: Stream> Timeout<S> { + pub(super) fn new(stream: S, duration: Duration) -> Self { + let next = Instant::now() + duration; + let deadline = tokio::time::sleep_until(next); + + Timeout { + stream: Fuse::new(stream), + deadline, + duration, + poll_deadline: true, + } + } +} + +impl<S: Stream> Stream for Timeout<S> { + type Item = Result<S::Item, Elapsed>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let me = self.project(); + + match me.stream.poll_next(cx) { + Poll::Ready(v) => { + if v.is_some() { + let next = Instant::now() + *me.duration; + me.deadline.reset(next); + *me.poll_deadline = true; + } + return Poll::Ready(v.map(Ok)); + } + Poll::Pending => {} + }; + + if *me.poll_deadline { + ready!(me.deadline.poll(cx)); + *me.poll_deadline = false; + return Poll::Ready(Some(Err(Elapsed::new()))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let (lower, upper) = self.stream.size_hint(); + + // The timeout stream may insert an error before and after each message + // from the underlying stream, but no more than one error between each + // message. Hence the upper bound is computed as 2x+1. + + // Using a helper function to enable use of question mark operator. + fn twice_plus_one(value: Option<usize>) -> Option<usize> { + value?.checked_mul(2)?.checked_add(1) + } + + (lower, twice_plus_one(upper)) + } +} + +// ===== impl Elapsed ===== + +impl Elapsed { + pub(crate) fn new() -> Self { + Elapsed(()) + } +} + +impl fmt::Display for Elapsed { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + "deadline has elapsed".fmt(fmt) + } +} + +impl std::error::Error for Elapsed {} + +impl From<Elapsed> for std::io::Error { + fn from(_err: Elapsed) -> std::io::Error { + std::io::ErrorKind::TimedOut.into() + } +} diff --git a/third_party/rust/tokio-stream/src/stream_ext/try_next.rs b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs new file mode 100644 index 0000000000..93aa3bc15f --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/try_next.rs @@ -0,0 +1,45 @@ +use crate::stream_ext::Next; +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`try_next`](super::StreamExt::try_next) method. + /// + /// # Cancel safety + /// + /// This method is cancel safe. It only + /// holds onto a reference to the underlying stream, + /// so dropping it will never lose a value. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryNext<'a, St: ?Sized> { + #[pin] + inner: Next<'a, St>, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<'a, St: ?Sized> TryNext<'a, St> { + pub(super) fn new(stream: &'a mut St) -> Self { + Self { + inner: Next::new(stream), + _pin: PhantomPinned, + } + } +} + +impl<T, E, St: ?Sized + Stream<Item = Result<T, E>> + Unpin> Future for TryNext<'_, St> { + type Output = Result<Option<T>, E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + me.inner.poll(cx).map(Option::transpose) + } +} |