use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future; use futures::stream::{FuturesUnordered, StreamExt}; use futures::task::Poll; use futures_test::task::noop_context; use std::panic::{self, AssertUnwindSafe}; use std::sync::{Arc, Barrier}; use std::thread; #[test] fn basic_usage() { block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); queue.push(rx1); queue.push(rx2); queue.push(rx3); assert!(!queue.poll_next_unpin(cx).is_ready()); tx2.send("hello").unwrap(); assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx)); assert!(!queue.poll_next_unpin(cx).is_ready()); tx1.send("world").unwrap(); tx3.send("world2").unwrap(); assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx)); assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); })); } #[test] fn resolving_errors() { block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = oneshot::channel(); queue.push(rx1); queue.push(rx2); queue.push(rx3); assert!(!queue.poll_next_unpin(cx).is_ready()); drop(tx2); assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); assert!(!queue.poll_next_unpin(cx).is_ready()); drop(tx1); tx3.send("world2").unwrap(); assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); })); } #[test] fn dropping_ready_queue() { block_on(future::lazy(move |_| { let queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); let (mut tx2, rx2) = oneshot::channel::<()>(); let (mut tx3, rx3) = oneshot::channel::<()>(); queue.push(rx1); queue.push(rx2); queue.push(rx3); { let cx = &mut noop_context(); assert!(!tx1.poll_canceled(cx).is_ready()); assert!(!tx2.poll_canceled(cx).is_ready()); assert!(!tx3.poll_canceled(cx).is_ready()); drop(queue); assert!(tx1.poll_canceled(cx).is_ready()); assert!(tx2.poll_canceled(cx).is_ready()); assert!(tx3.poll_canceled(cx).is_ready()); } })); } #[test] fn stress() { const ITER: usize = if cfg!(miri) { 30 } else { 300 }; for i in 0..ITER { let n = (i % 10) + 1; let mut queue = FuturesUnordered::new(); for _ in 0..5 { let barrier = Arc::new(Barrier::new(n + 1)); for num in 0..n { let barrier = barrier.clone(); let (tx, rx) = oneshot::channel(); queue.push(rx); thread::spawn(move || { barrier.wait(); tx.send(num).unwrap(); }); } barrier.wait(); let mut sync = block_on_stream(queue); let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect(); assert_eq!(rx.len(), n); rx.sort_unstable(); for (i, x) in rx.into_iter().enumerate() { assert_eq!(i, x); } queue = sync.into_inner(); } } } #[test] fn panicking_future_dropped() { block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); queue.push(future::poll_fn(|_| -> Poll> { panic!() })); let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx))); assert!(r.is_err()); assert!(queue.is_empty()); assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); })); }