diff options
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext/fold.rs')
-rw-r--r-- | third_party/rust/tokio-stream/src/stream_ext/fold.rs | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext/fold.rs b/third_party/rust/tokio-stream/src/stream_ext/fold.rs new file mode 100644 index 0000000000..e2e97d8f37 --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/fold.rs @@ -0,0 +1,57 @@ +use crate::Stream; + +use core::future::Future; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future returned by the [`fold`](super::StreamExt::fold) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct FoldFuture<St, B, F> { + #[pin] + stream: St, + acc: Option<B>, + f: F, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl<St, B, F> FoldFuture<St, B, F> { + pub(super) fn new(stream: St, init: B, f: F) -> Self { + Self { + stream, + acc: Some(init), + f, + _pin: PhantomPinned, + } + } +} + +impl<St, B, F> Future for FoldFuture<St, B, F> +where + St: Stream, + F: FnMut(B, St::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + let next = ready!(me.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let old = me.acc.take().unwrap(); + let new = (me.f)(old, v); + *me.acc = Some(new); + } + None => return Poll::Ready(me.acc.take().unwrap()), + } + } + } +} |