use futures::channel::mpsc; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, FutureExt}; use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; use futures::task::Poll; use futures_test::task::noop_context; #[test] fn is_terminated() { let mut cx = noop_context(); let mut tasks = SelectAll::new(); assert_eq!(tasks.is_terminated(), false); assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); assert_eq!(tasks.is_terminated(), true); // Test that the sentinel value doesn't leak assert_eq!(tasks.is_empty(), true); assert_eq!(tasks.len(), 0); tasks.push(future::ready(1).into_stream()); assert_eq!(tasks.is_empty(), false); assert_eq!(tasks.len(), 1); assert_eq!(tasks.is_terminated(), false); assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); assert_eq!(tasks.is_terminated(), false); assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); assert_eq!(tasks.is_terminated(), true); } #[test] fn issue_1626() { let a = stream::iter(0..=2); let b = stream::iter(10..=14); let mut s = block_on_stream(stream::select_all(vec![a, b])); assert_eq!(s.next(), Some(0)); assert_eq!(s.next(), Some(10)); assert_eq!(s.next(), Some(1)); assert_eq!(s.next(), Some(11)); assert_eq!(s.next(), Some(2)); assert_eq!(s.next(), Some(12)); assert_eq!(s.next(), Some(13)); assert_eq!(s.next(), Some(14)); assert_eq!(s.next(), None); } #[test] fn works_1() { let (a_tx, a_rx) = mpsc::unbounded::(); let (b_tx, b_rx) = mpsc::unbounded::(); let (c_tx, c_rx) = mpsc::unbounded::(); let streams = vec![a_rx, b_rx, c_rx]; let mut stream = block_on_stream(select_all(streams)); b_tx.unbounded_send(99).unwrap(); a_tx.unbounded_send(33).unwrap(); assert_eq!(Some(33), stream.next()); assert_eq!(Some(99), stream.next()); b_tx.unbounded_send(99).unwrap(); a_tx.unbounded_send(33).unwrap(); assert_eq!(Some(33), stream.next()); assert_eq!(Some(99), stream.next()); c_tx.unbounded_send(42).unwrap(); assert_eq!(Some(42), stream.next()); a_tx.unbounded_send(43).unwrap(); assert_eq!(Some(43), stream.next()); drop((a_tx, b_tx, c_tx)); assert_eq!(None, stream.next()); } #[test] fn clear() { let mut tasks = select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); assert_eq!(block_on(tasks.next()), Some(1)); assert!(!tasks.is_empty()); tasks.clear(); assert!(tasks.is_empty()); tasks.push(stream::iter(vec![3].into_iter())); assert!(!tasks.is_empty()); tasks.clear(); assert!(tasks.is_empty()); assert_eq!(block_on(tasks.next()), None); assert!(tasks.is_terminated()); tasks.clear(); assert!(!tasks.is_terminated()); } #[test] fn iter_mut() { let mut stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] .into_iter() .collect::>(); let mut iter = stream.iter_mut(); assert_eq!(iter.len(), 3); assert!(iter.next().is_some()); assert_eq!(iter.len(), 2); assert!(iter.next().is_some()); assert_eq!(iter.len(), 1); assert!(iter.next().is_some()); assert_eq!(iter.len(), 0); assert!(iter.next().is_none()); let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] .into_iter() .collect::>(); assert_eq!(stream.len(), 3); assert_eq!(block_on(stream.next()), Some(1)); assert_eq!(stream.len(), 2); let mut iter = stream.iter_mut(); assert_eq!(iter.len(), 2); assert!(iter.next().is_some()); assert_eq!(iter.len(), 1); assert!(iter.next().is_some()); assert_eq!(iter.len(), 0); assert!(iter.next().is_none()); assert_eq!(block_on(stream.next()), Some(2)); assert_eq!(stream.len(), 2); assert_eq!(block_on(stream.next()), None); let mut iter = stream.iter_mut(); assert_eq!(iter.len(), 0); assert!(iter.next().is_none()); } #[test] fn iter() { let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] .into_iter() .collect::>(); let mut iter = stream.iter(); assert_eq!(iter.len(), 3); assert!(iter.next().is_some()); assert_eq!(iter.len(), 2); assert!(iter.next().is_some()); assert_eq!(iter.len(), 1); assert!(iter.next().is_some()); assert_eq!(iter.len(), 0); assert!(iter.next().is_none()); let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] .into_iter() .collect::>(); assert_eq!(stream.len(), 3); assert_eq!(block_on(stream.next()), Some(1)); assert_eq!(stream.len(), 2); let mut iter = stream.iter(); assert_eq!(iter.len(), 2); assert!(iter.next().is_some()); assert_eq!(iter.len(), 1); assert!(iter.next().is_some()); assert_eq!(iter.len(), 0); assert!(iter.next().is_none()); assert_eq!(block_on(stream.next()), Some(2)); assert_eq!(stream.len(), 2); assert_eq!(block_on(stream.next()), None); let mut iter = stream.iter(); assert_eq!(iter.len(), 0); assert!(iter.next().is_none()); } #[test] fn into_iter() { let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] .into_iter() .collect::>(); let mut iter = stream.into_iter(); assert_eq!(iter.len(), 3); assert!(iter.next().is_some()); assert_eq!(iter.len(), 2); assert!(iter.next().is_some()); assert_eq!(iter.len(), 1); assert!(iter.next().is_some()); assert_eq!(iter.len(), 0); assert!(iter.next().is_none()); }