197 lines
5.7 KiB
Rust
197 lines
5.7 KiB
Rust
use futures::channel::mpsc;
|
|
use futures::executor::{block_on, block_on_stream};
|
|
use futures::future::{self, FutureExt};
|
|
use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
|
|
use futures::task::Poll;
|
|
use futures_test::task::noop_context;
|
|
|
|
#[test]
|
|
fn is_terminated() {
|
|
let mut cx = noop_context();
|
|
let mut tasks = SelectAll::new();
|
|
|
|
assert_eq!(tasks.is_terminated(), false);
|
|
assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
|
|
assert_eq!(tasks.is_terminated(), true);
|
|
|
|
// Test that the sentinel value doesn't leak
|
|
assert_eq!(tasks.is_empty(), true);
|
|
assert_eq!(tasks.len(), 0);
|
|
|
|
tasks.push(future::ready(1).into_stream());
|
|
|
|
assert_eq!(tasks.is_empty(), false);
|
|
assert_eq!(tasks.len(), 1);
|
|
|
|
assert_eq!(tasks.is_terminated(), false);
|
|
assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
|
|
assert_eq!(tasks.is_terminated(), false);
|
|
assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
|
|
assert_eq!(tasks.is_terminated(), true);
|
|
}
|
|
|
|
#[test]
|
|
fn issue_1626() {
|
|
let a = stream::iter(0..=2);
|
|
let b = stream::iter(10..=14);
|
|
|
|
let mut s = block_on_stream(stream::select_all(vec![a, b]));
|
|
|
|
assert_eq!(s.next(), Some(0));
|
|
assert_eq!(s.next(), Some(10));
|
|
assert_eq!(s.next(), Some(1));
|
|
assert_eq!(s.next(), Some(11));
|
|
assert_eq!(s.next(), Some(2));
|
|
assert_eq!(s.next(), Some(12));
|
|
assert_eq!(s.next(), Some(13));
|
|
assert_eq!(s.next(), Some(14));
|
|
assert_eq!(s.next(), None);
|
|
}
|
|
|
|
#[test]
|
|
fn works_1() {
|
|
let (a_tx, a_rx) = mpsc::unbounded::<u32>();
|
|
let (b_tx, b_rx) = mpsc::unbounded::<u32>();
|
|
let (c_tx, c_rx) = mpsc::unbounded::<u32>();
|
|
|
|
let streams = vec![a_rx, b_rx, c_rx];
|
|
|
|
let mut stream = block_on_stream(select_all(streams));
|
|
|
|
b_tx.unbounded_send(99).unwrap();
|
|
a_tx.unbounded_send(33).unwrap();
|
|
assert_eq!(Some(33), stream.next());
|
|
assert_eq!(Some(99), stream.next());
|
|
|
|
b_tx.unbounded_send(99).unwrap();
|
|
a_tx.unbounded_send(33).unwrap();
|
|
assert_eq!(Some(33), stream.next());
|
|
assert_eq!(Some(99), stream.next());
|
|
|
|
c_tx.unbounded_send(42).unwrap();
|
|
assert_eq!(Some(42), stream.next());
|
|
a_tx.unbounded_send(43).unwrap();
|
|
assert_eq!(Some(43), stream.next());
|
|
|
|
drop((a_tx, b_tx, c_tx));
|
|
assert_eq!(None, stream.next());
|
|
}
|
|
|
|
#[test]
|
|
fn clear() {
|
|
let mut tasks =
|
|
select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]);
|
|
|
|
assert_eq!(block_on(tasks.next()), Some(1));
|
|
assert!(!tasks.is_empty());
|
|
|
|
tasks.clear();
|
|
assert!(tasks.is_empty());
|
|
|
|
tasks.push(stream::iter(vec![3].into_iter()));
|
|
assert!(!tasks.is_empty());
|
|
|
|
tasks.clear();
|
|
assert!(tasks.is_empty());
|
|
|
|
assert_eq!(block_on(tasks.next()), None);
|
|
assert!(tasks.is_terminated());
|
|
tasks.clear();
|
|
assert!(!tasks.is_terminated());
|
|
}
|
|
|
|
#[test]
|
|
fn iter_mut() {
|
|
let mut stream =
|
|
vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
|
|
.into_iter()
|
|
.collect::<SelectAll<_>>();
|
|
|
|
let mut iter = stream.iter_mut();
|
|
assert_eq!(iter.len(), 3);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 2);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 1);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 0);
|
|
assert!(iter.next().is_none());
|
|
|
|
let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
|
|
.into_iter()
|
|
.collect::<SelectAll<_>>();
|
|
|
|
assert_eq!(stream.len(), 3);
|
|
assert_eq!(block_on(stream.next()), Some(1));
|
|
assert_eq!(stream.len(), 2);
|
|
let mut iter = stream.iter_mut();
|
|
assert_eq!(iter.len(), 2);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 1);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 0);
|
|
assert!(iter.next().is_none());
|
|
|
|
assert_eq!(block_on(stream.next()), Some(2));
|
|
assert_eq!(stream.len(), 2);
|
|
assert_eq!(block_on(stream.next()), None);
|
|
let mut iter = stream.iter_mut();
|
|
assert_eq!(iter.len(), 0);
|
|
assert!(iter.next().is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn iter() {
|
|
let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
|
|
.into_iter()
|
|
.collect::<SelectAll<_>>();
|
|
|
|
let mut iter = stream.iter();
|
|
assert_eq!(iter.len(), 3);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 2);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 1);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 0);
|
|
assert!(iter.next().is_none());
|
|
|
|
let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
|
|
.into_iter()
|
|
.collect::<SelectAll<_>>();
|
|
|
|
assert_eq!(stream.len(), 3);
|
|
assert_eq!(block_on(stream.next()), Some(1));
|
|
assert_eq!(stream.len(), 2);
|
|
let mut iter = stream.iter();
|
|
assert_eq!(iter.len(), 2);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 1);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 0);
|
|
assert!(iter.next().is_none());
|
|
|
|
assert_eq!(block_on(stream.next()), Some(2));
|
|
assert_eq!(stream.len(), 2);
|
|
assert_eq!(block_on(stream.next()), None);
|
|
let mut iter = stream.iter();
|
|
assert_eq!(iter.len(), 0);
|
|
assert!(iter.next().is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn into_iter() {
|
|
let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
|
|
.into_iter()
|
|
.collect::<SelectAll<_>>();
|
|
|
|
let mut iter = stream.into_iter();
|
|
assert_eq!(iter.len(), 3);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 2);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 1);
|
|
assert!(iter.next().is_some());
|
|
assert_eq!(iter.len(), 0);
|
|
assert!(iter.next().is_none());
|
|
}
|