summaryrefslogtreecommitdiffstats
path: root/vendor/futures/tests/stream_try_stream.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:39 +0000
commit1376c5a617be5c25655d0d7cb63e3beaa5a6e026 (patch)
tree3bb8d61aee02bc7a15eab3f36e3b921afc2075d0 /vendor/futures/tests/stream_try_stream.rs
parentReleasing progress-linux version 1.69.0+dfsg1-1~progress7.99u1. (diff)
downloadrustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.tar.xz
rustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.zip
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/futures/tests/stream_try_stream.rs')
-rw-r--r--vendor/futures/tests/stream_try_stream.rs98
1 files changed, 97 insertions, 1 deletions
diff --git a/vendor/futures/tests/stream_try_stream.rs b/vendor/futures/tests/stream_try_stream.rs
index 194e74db7..b3d04b920 100644
--- a/vendor/futures/tests/stream_try_stream.rs
+++ b/vendor/futures/tests/stream_try_stream.rs
@@ -1,7 +1,12 @@
+use core::pin::Pin;
+
use futures::{
- stream::{self, StreamExt, TryStreamExt},
+ stream::{self, repeat, Repeat, StreamExt, TryStreamExt},
task::Poll,
+ Stream,
};
+use futures_executor::block_on;
+use futures_task::Context;
use futures_test::task::noop_context;
#[test]
@@ -36,3 +41,94 @@ fn try_take_while_after_err() {
.boxed();
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
}
+
+#[test]
+fn try_flatten_unordered() {
+ let test_st = stream::iter(1..7)
+ .map(|val: u32| {
+ if val % 2 == 0 {
+ Ok(stream::unfold((val, 1), |(val, pow)| async move {
+ Some((val.pow(pow), (val, pow + 1)))
+ })
+ .take(3)
+ .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) }))
+ } else {
+ Err(val)
+ }
+ })
+ .map_ok(Box::pin)
+ .try_flatten_unordered(None);
+
+ block_on(async move {
+ assert_eq!(
+ // All numbers can be divided by 16 and odds must be `Err`
+ // For all basic evens we must have powers from 1 to 3
+ vec![
+ Err(1),
+ Err(3),
+ Err(5),
+ Ok(2),
+ Ok(4),
+ Ok(6),
+ Ok(4),
+ Err(16),
+ Ok(36),
+ Ok(8),
+ Err(64),
+ Ok(216)
+ ],
+ test_st.collect::<Vec<_>>().await
+ )
+ });
+
+ #[derive(Clone, Debug)]
+ struct ErrorStream {
+ error_after: usize,
+ polled: usize,
+ }
+
+ impl Stream for ErrorStream {
+ type Item = Result<Repeat<Result<(), ()>>, ()>;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
+ if self.polled > self.error_after {
+ panic!("Polled after error");
+ } else {
+ let out =
+ if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) };
+ self.polled += 1;
+ Poll::Ready(Some(out))
+ }
+ }
+ }
+
+ block_on(async move {
+ let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None);
+ let mut ctr = 0;
+ while (st.try_next().await).is_ok() {
+ ctr += 1;
+ }
+ assert_eq!(ctr, 0);
+
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .try_flatten_unordered(None)
+ .inspect_ok(|_| panic!("Unexpected `Ok`"))
+ .try_collect::<Vec<_>>()
+ .await,
+ Err(())
+ );
+
+ let mut taken = 0;
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .map_ok(|st| st.take(3))
+ .try_flatten_unordered(1)
+ .inspect(|_| taken += 1)
+ .try_fold((), |(), res| async move { Ok(res) })
+ .await,
+ Err(())
+ );
+ assert_eq!(taken, 31);
+ })
+}