diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:39 +0000 |
commit | 1376c5a617be5c25655d0d7cb63e3beaa5a6e026 (patch) | |
tree | 3bb8d61aee02bc7a15eab3f36e3b921afc2075d0 /vendor/futures/tests/stream_try_stream.rs | |
parent | Releasing progress-linux version 1.69.0+dfsg1-1~progress7.99u1. (diff) | |
download | rustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.tar.xz rustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.zip |
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/futures/tests/stream_try_stream.rs')
-rw-r--r-- | vendor/futures/tests/stream_try_stream.rs | 98 |
1 files changed, 97 insertions, 1 deletions
diff --git a/vendor/futures/tests/stream_try_stream.rs b/vendor/futures/tests/stream_try_stream.rs index 194e74db7..b3d04b920 100644 --- a/vendor/futures/tests/stream_try_stream.rs +++ b/vendor/futures/tests/stream_try_stream.rs @@ -1,7 +1,12 @@ +use core::pin::Pin; + use futures::{ - stream::{self, StreamExt, TryStreamExt}, + stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, task::Poll, + Stream, }; +use futures_executor::block_on; +use futures_task::Context; use futures_test::task::noop_context; #[test] @@ -36,3 +41,94 @@ fn try_take_while_after_err() { .boxed(); assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); } + +#[test] +fn try_flatten_unordered() { + let test_st = stream::iter(1..7) + .map(|val: u32| { + if val % 2 == 0 { + Ok(stream::unfold((val, 1), |(val, pow)| async move { + Some((val.pow(pow), (val, pow + 1))) + }) + .take(3) + .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) + } else { + Err(val) + } + }) + .map_ok(Box::pin) + .try_flatten_unordered(None); + + block_on(async move { + assert_eq!( + // All numbers can be divided by 16 and odds must be `Err` + // For all basic evens we must have powers from 1 to 3 + vec![ + Err(1), + Err(3), + Err(5), + Ok(2), + Ok(4), + Ok(6), + Ok(4), + Err(16), + Ok(36), + Ok(8), + Err(64), + Ok(216) + ], + test_st.collect::<Vec<_>>().await + ) + }); + + #[derive(Clone, Debug)] + struct ErrorStream { + error_after: usize, + polled: usize, + } + + impl Stream for ErrorStream { + type Item = Result<Repeat<Result<(), ()>>, ()>; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> { + if self.polled > self.error_after { + panic!("Polled after error"); + } else { + let out = + if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) }; + self.polled += 1; + Poll::Ready(Some(out)) + } + } + } + + block_on(async move { + let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None); + let mut ctr = 0; + while (st.try_next().await).is_ok() { + ctr += 1; + } + assert_eq!(ctr, 0); + + assert_eq!( + ErrorStream { error_after: 10, polled: 0 } + .try_flatten_unordered(None) + .inspect_ok(|_| panic!("Unexpected `Ok`")) + .try_collect::<Vec<_>>() + .await, + Err(()) + ); + + let mut taken = 0; + assert_eq!( + ErrorStream { error_after: 10, polled: 0 } + .map_ok(|st| st.take(3)) + .try_flatten_unordered(1) + .inspect(|_| taken += 1) + .try_fold((), |(), res| async move { Ok(res) }) + .await, + Err(()) + ); + assert_eq!(taken, 31); + }) +} |