use core::pin::Pin; use futures::{ stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, task::Poll, Stream, }; use futures_executor::block_on; use futures_task::Context; use futures_test::task::noop_context; #[test] fn try_filter_map_after_err() { let cx = &mut noop_context(); let mut s = stream::iter(1..=3) .map(Ok) .try_filter_map(|v| async move { Err::, _>(v) }) .filter_map(|r| async move { r.ok() }) .boxed(); assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); } #[test] fn try_skip_while_after_err() { let cx = &mut noop_context(); let mut s = stream::iter(1..=3) .map(Ok) .try_skip_while(|_| async move { Err::<_, ()>(()) }) .filter_map(|r| async move { r.ok() }) .boxed(); assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); } #[test] fn try_take_while_after_err() { let cx = &mut noop_context(); let mut s = stream::iter(1..=3) .map(Ok) .try_take_while(|_| async move { Err::<_, ()>(()) }) .filter_map(|r| async move { r.ok() }) .boxed(); assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); } #[test] fn try_flatten_unordered() { let test_st = stream::iter(1..7) .map(|val: u32| { if val % 2 == 0 { Ok(stream::unfold((val, 1), |(val, pow)| async move { Some((val.pow(pow), (val, pow + 1))) }) .take(3) .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) } else { Err(val) } }) .map_ok(Box::pin) .try_flatten_unordered(None); block_on(async move { assert_eq!( // All numbers can be divided by 16 and odds must be `Err` // For all basic evens we must have powers from 1 to 3 vec![ Err(1), Err(3), Err(5), Ok(2), Ok(4), Ok(6), Ok(4), Err(16), Ok(36), Ok(8), Err(64), Ok(216) ], test_st.collect::>().await ) }); #[derive(Clone, Debug)] struct ErrorStream { error_after: usize, polled: usize, } impl Stream for ErrorStream { type Item = Result>, ()>; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll> { if self.polled > self.error_after { panic!("Polled after error"); } else { let out = if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) }; self.polled += 1; Poll::Ready(Some(out)) } } } block_on(async move { let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None); let mut ctr = 0; while (st.try_next().await).is_ok() { ctr += 1; } assert_eq!(ctr, 0); assert_eq!( ErrorStream { error_after: 10, polled: 0 } .try_flatten_unordered(None) .inspect_ok(|_| panic!("Unexpected `Ok`")) .try_collect::>() .await, Err(()) ); let mut taken = 0; assert_eq!( ErrorStream { error_after: 10, polled: 0 } .map_ok(|st| st.take(3)) .try_flatten_unordered(1) .inspect(|_| taken += 1) .try_fold((), |(), res| async move { Ok(res) }) .await, Err(()) ); assert_eq!(taken, 31); }) }