summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures/tests/stream_select_all.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures/tests/stream_select_all.rs')
-rw-r--r--third_party/rust/futures/tests/stream_select_all.rs197
1 files changed, 197 insertions, 0 deletions
diff --git a/third_party/rust/futures/tests/stream_select_all.rs b/third_party/rust/futures/tests/stream_select_all.rs
new file mode 100644
index 0000000000..4ae0735762
--- /dev/null
+++ b/third_party/rust/futures/tests/stream_select_all.rs
@@ -0,0 +1,197 @@
+use futures::channel::mpsc;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, FutureExt};
+use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+
+#[test]
+fn is_terminated() {
+ let mut cx = noop_context();
+ let mut tasks = SelectAll::new();
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+
+ // Test that the sentinel value doesn't leak
+ assert_eq!(tasks.is_empty(), true);
+ assert_eq!(tasks.len(), 0);
+
+ tasks.push(future::ready(1).into_stream());
+
+ assert_eq!(tasks.is_empty(), false);
+ assert_eq!(tasks.len(), 1);
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+}
+
+#[test]
+fn issue_1626() {
+ let a = stream::iter(0..=2);
+ let b = stream::iter(10..=14);
+
+ let mut s = block_on_stream(stream::select_all(vec![a, b]));
+
+ assert_eq!(s.next(), Some(0));
+ assert_eq!(s.next(), Some(10));
+ assert_eq!(s.next(), Some(1));
+ assert_eq!(s.next(), Some(11));
+ assert_eq!(s.next(), Some(2));
+ assert_eq!(s.next(), Some(12));
+ assert_eq!(s.next(), Some(13));
+ assert_eq!(s.next(), Some(14));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = mpsc::unbounded::<u32>();
+ let (b_tx, b_rx) = mpsc::unbounded::<u32>();
+ let (c_tx, c_rx) = mpsc::unbounded::<u32>();
+
+ let streams = vec![a_rx, b_rx, c_rx];
+
+ let mut stream = block_on_stream(select_all(streams));
+
+ b_tx.unbounded_send(99).unwrap();
+ a_tx.unbounded_send(33).unwrap();
+ assert_eq!(Some(33), stream.next());
+ assert_eq!(Some(99), stream.next());
+
+ b_tx.unbounded_send(99).unwrap();
+ a_tx.unbounded_send(33).unwrap();
+ assert_eq!(Some(33), stream.next());
+ assert_eq!(Some(99), stream.next());
+
+ c_tx.unbounded_send(42).unwrap();
+ assert_eq!(Some(42), stream.next());
+ a_tx.unbounded_send(43).unwrap();
+ assert_eq!(Some(43), stream.next());
+
+ drop((a_tx, b_tx, c_tx));
+ assert_eq!(None, stream.next());
+}
+
+#[test]
+fn clear() {
+ let mut tasks =
+ select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]);
+
+ assert_eq!(block_on(tasks.next()), Some(1));
+ assert!(!tasks.is_empty());
+
+ tasks.clear();
+ assert!(tasks.is_empty());
+
+ tasks.push(stream::iter(vec![3].into_iter()));
+ assert!(!tasks.is_empty());
+
+ tasks.clear();
+ assert!(tasks.is_empty());
+
+ assert_eq!(block_on(tasks.next()), None);
+ assert!(tasks.is_terminated());
+ tasks.clear();
+ assert!(!tasks.is_terminated());
+}
+
+#[test]
+fn iter_mut() {
+ let mut stream =
+ vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ let mut iter = stream.iter_mut();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.next()), Some(1));
+ assert_eq!(stream.len(), 2);
+ let mut iter = stream.iter_mut();
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ assert_eq!(block_on(stream.next()), Some(2));
+ assert_eq!(stream.len(), 2);
+ assert_eq!(block_on(stream.next()), None);
+ let mut iter = stream.iter_mut();
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn iter() {
+ let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.next()), Some(1));
+ assert_eq!(stream.len(), 2);
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+
+ assert_eq!(block_on(stream.next()), Some(2));
+ assert_eq!(stream.len(), 2);
+ assert_eq!(block_on(stream.next()), None);
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn into_iter() {
+ let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+ .into_iter()
+ .collect::<SelectAll<_>>();
+
+ let mut iter = stream.into_iter();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}