summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-stream/tests
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-stream/tests')
-rw-r--r--third_party/rust/tokio-stream/tests/async_send_sync.rs107
-rw-r--r--third_party/rust/tokio-stream/tests/chunks_timeout.rs84
-rw-r--r--third_party/rust/tokio-stream/tests/stream_chain.rs100
-rw-r--r--third_party/rust/tokio-stream/tests/stream_collect.rs146
-rw-r--r--third_party/rust/tokio-stream/tests/stream_empty.rs11
-rw-r--r--third_party/rust/tokio-stream/tests/stream_fuse.rs50
-rw-r--r--third_party/rust/tokio-stream/tests/stream_iter.rs18
-rw-r--r--third_party/rust/tokio-stream/tests/stream_merge.rs83
-rw-r--r--third_party/rust/tokio-stream/tests/stream_once.rs12
-rw-r--r--third_party/rust/tokio-stream/tests/stream_panic.rs55
-rw-r--r--third_party/rust/tokio-stream/tests/stream_pending.rs14
-rw-r--r--third_party/rust/tokio-stream/tests/stream_stream_map.rs387
-rw-r--r--third_party/rust/tokio-stream/tests/stream_timeout.rs109
-rw-r--r--third_party/rust/tokio-stream/tests/support/mpsc.rs15
-rw-r--r--third_party/rust/tokio-stream/tests/time_throttle.rs28
-rw-r--r--third_party/rust/tokio-stream/tests/watch.rs29
16 files changed, 1248 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/tests/async_send_sync.rs b/third_party/rust/tokio-stream/tests/async_send_sync.rs
new file mode 100644
index 0000000000..f1c8b4efe2
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/async_send_sync.rs
@@ -0,0 +1,107 @@
+#![allow(clippy::diverging_sub_expression)]
+
+use std::rc::Rc;
+
+#[allow(dead_code)]
+type BoxStream<T> = std::pin::Pin<Box<dyn tokio_stream::Stream<Item = T>>>;
+
+#[allow(dead_code)]
+fn require_send<T: Send>(_t: &T) {}
+#[allow(dead_code)]
+fn require_sync<T: Sync>(_t: &T) {}
+#[allow(dead_code)]
+fn require_unpin<T: Unpin>(_t: &T) {}
+
+#[allow(dead_code)]
+struct Invalid;
+
+trait AmbiguousIfSend<A> {
+ fn some_item(&self) {}
+}
+impl<T: ?Sized> AmbiguousIfSend<()> for T {}
+impl<T: ?Sized + Send> AmbiguousIfSend<Invalid> for T {}
+
+trait AmbiguousIfSync<A> {
+ fn some_item(&self) {}
+}
+impl<T: ?Sized> AmbiguousIfSync<()> for T {}
+impl<T: ?Sized + Sync> AmbiguousIfSync<Invalid> for T {}
+
+trait AmbiguousIfUnpin<A> {
+ fn some_item(&self) {}
+}
+impl<T: ?Sized> AmbiguousIfUnpin<()> for T {}
+impl<T: ?Sized + Unpin> AmbiguousIfUnpin<Invalid> for T {}
+
+macro_rules! into_todo {
+ ($typ:ty) => {{
+ let x: $typ = todo!();
+ x
+ }};
+}
+
+macro_rules! async_assert_fn {
+ ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => {
+ #[allow(unreachable_code)]
+ #[allow(unused_variables)]
+ const _: fn() = || {
+ let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+ require_send(&f);
+ require_sync(&f);
+ };
+ };
+ ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => {
+ #[allow(unreachable_code)]
+ #[allow(unused_variables)]
+ const _: fn() = || {
+ let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+ require_send(&f);
+ AmbiguousIfSync::some_item(&f);
+ };
+ };
+ ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => {
+ #[allow(unreachable_code)]
+ #[allow(unused_variables)]
+ const _: fn() = || {
+ let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+ AmbiguousIfSend::some_item(&f);
+ require_sync(&f);
+ };
+ };
+ ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => {
+ #[allow(unreachable_code)]
+ #[allow(unused_variables)]
+ const _: fn() = || {
+ let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+ AmbiguousIfSend::some_item(&f);
+ AmbiguousIfSync::some_item(&f);
+ };
+ };
+ ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => {
+ #[allow(unreachable_code)]
+ #[allow(unused_variables)]
+ const _: fn() = || {
+ let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+ AmbiguousIfUnpin::some_item(&f);
+ };
+ };
+ ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => {
+ #[allow(unreachable_code)]
+ #[allow(unused_variables)]
+ const _: fn() = || {
+ let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+ require_unpin(&f);
+ };
+ };
+}
+
+async_assert_fn!(tokio_stream::empty<Rc<u8>>(): Send & Sync);
+async_assert_fn!(tokio_stream::pending<Rc<u8>>(): Send & Sync);
+async_assert_fn!(tokio_stream::iter(std::vec::IntoIter<u8>): Send & Sync);
+
+async_assert_fn!(tokio_stream::StreamExt::next(&mut BoxStream<()>): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::try_next(&mut BoxStream<Result<(), ()>>): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::collect<Vec<()>>(&mut BoxStream<()>): !Unpin);
diff --git a/third_party/rust/tokio-stream/tests/chunks_timeout.rs b/third_party/rust/tokio-stream/tests/chunks_timeout.rs
new file mode 100644
index 0000000000..ffc7deadd7
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/chunks_timeout.rs
@@ -0,0 +1,84 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
+
+use tokio::time;
+use tokio_stream::{self as stream, StreamExt};
+use tokio_test::assert_pending;
+use tokio_test::task;
+
+use futures::FutureExt;
+use std::time::Duration;
+
+#[tokio::test(start_paused = true)]
+async fn usage() {
+ let iter = vec![1, 2, 3].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![4].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(4, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test(start_paused = true)]
+async fn full_chunk_with_timeout() {
+ let iter = vec![1, 2].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![3].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
+
+ let iter = vec![4].into_iter();
+ let stream2 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chain(stream2)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+ assert_pending!(chunk_stream.poll_next());
+ time::advance(Duration::from_secs(2)).await;
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test]
+#[ignore]
+async fn real_time() {
+ let iter = vec![1, 2, 3, 4].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let iter = vec![5].into_iter();
+ let stream1 =
+ stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+
+ let chunk_stream = stream0
+ .chain(stream1)
+ .chunks_timeout(3, Duration::from_secs(2));
+
+ let mut chunk_stream = task::spawn(chunk_stream);
+
+ assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+ assert_eq!(chunk_stream.next().await, Some(vec![4]));
+ assert_eq!(chunk_stream.next().await, Some(vec![5]));
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_chain.rs b/third_party/rust/tokio-stream/tests/stream_chain.rs
new file mode 100644
index 0000000000..f3b7edb16a
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_chain.rs
@@ -0,0 +1,100 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+use tokio_test::{assert_pending, assert_ready, task};
+
+mod support {
+ pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+#[tokio::test]
+async fn basic_usage() {
+ let one = stream::iter(vec![1, 2, 3]);
+ let two = stream::iter(vec![4, 5, 6]);
+
+ let mut stream = one.chain(two);
+
+ assert_eq!(stream.size_hint(), (6, Some(6)));
+ assert_eq!(stream.next().await, Some(1));
+
+ assert_eq!(stream.size_hint(), (5, Some(5)));
+ assert_eq!(stream.next().await, Some(2));
+
+ assert_eq!(stream.size_hint(), (4, Some(4)));
+ assert_eq!(stream.next().await, Some(3));
+
+ assert_eq!(stream.size_hint(), (3, Some(3)));
+ assert_eq!(stream.next().await, Some(4));
+
+ assert_eq!(stream.size_hint(), (2, Some(2)));
+ assert_eq!(stream.next().await, Some(5));
+
+ assert_eq!(stream.size_hint(), (1, Some(1)));
+ assert_eq!(stream.next().await, Some(6));
+
+ assert_eq!(stream.size_hint(), (0, Some(0)));
+ assert_eq!(stream.next().await, None);
+
+ assert_eq!(stream.size_hint(), (0, Some(0)));
+ assert_eq!(stream.next().await, None);
+}
+
+#[tokio::test]
+async fn pending_first() {
+ let (tx1, rx1) = mpsc::unbounded_channel_stream();
+ let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+ let mut stream = task::spawn(rx1.chain(rx2));
+ assert_eq!(stream.size_hint(), (0, None));
+
+ assert_pending!(stream.poll_next());
+
+ tx2.send(2).unwrap();
+ assert!(!stream.is_woken());
+
+ assert_pending!(stream.poll_next());
+
+ tx1.send(1).unwrap();
+ assert!(stream.is_woken());
+ assert_eq!(Some(1), assert_ready!(stream.poll_next()));
+
+ assert_pending!(stream.poll_next());
+
+ drop(tx1);
+
+ assert_eq!(stream.size_hint(), (0, None));
+
+ assert!(stream.is_woken());
+ assert_eq!(Some(2), assert_ready!(stream.poll_next()));
+
+ assert_eq!(stream.size_hint(), (0, None));
+
+ drop(tx2);
+
+ assert_eq!(stream.size_hint(), (0, None));
+ assert_eq!(None, assert_ready!(stream.poll_next()));
+}
+
+#[test]
+fn size_overflow() {
+ struct Monster;
+
+ impl tokio_stream::Stream for Monster {
+ type Item = ();
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<()>> {
+ panic!()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (usize::MAX, Some(usize::MAX))
+ }
+ }
+
+ let m1 = Monster;
+ let m2 = Monster;
+ let m = m1.chain(m2);
+ assert_eq!(m.size_hint(), (usize::MAX, None));
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_collect.rs b/third_party/rust/tokio-stream/tests/stream_collect.rs
new file mode 100644
index 0000000000..07659a1fc3
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_collect.rs
@@ -0,0 +1,146 @@
+use tokio_stream::{self as stream, StreamExt};
+use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
+
+mod support {
+ pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+#[allow(clippy::let_unit_value)]
+#[tokio::test]
+async fn empty_unit() {
+ // Drains the stream.
+ let mut iter = vec![(), (), ()].into_iter();
+ let _: () = stream::iter(&mut iter).collect().await;
+ assert!(iter.next().is_none());
+}
+
+#[tokio::test]
+async fn empty_vec() {
+ let coll: Vec<u32> = stream::empty().collect().await;
+ assert!(coll.is_empty());
+}
+
+#[tokio::test]
+async fn empty_box_slice() {
+ let coll: Box<[u32]> = stream::empty().collect().await;
+ assert!(coll.is_empty());
+}
+
+#[tokio::test]
+async fn empty_string() {
+ let coll: String = stream::empty::<&str>().collect().await;
+ assert!(coll.is_empty());
+}
+
+#[tokio::test]
+async fn empty_result() {
+ let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
+ assert_eq!(Ok(vec![]), coll);
+}
+
+#[tokio::test]
+async fn collect_vec_items() {
+ let (tx, rx) = mpsc::unbounded_channel_stream();
+ let mut fut = task::spawn(rx.collect::<Vec<i32>>());
+
+ assert_pending!(fut.poll());
+
+ tx.send(1).unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ tx.send(2).unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ drop(tx);
+ assert!(fut.is_woken());
+ let coll = assert_ready!(fut.poll());
+ assert_eq!(vec![1, 2], coll);
+}
+
+#[tokio::test]
+async fn collect_string_items() {
+ let (tx, rx) = mpsc::unbounded_channel_stream();
+
+ let mut fut = task::spawn(rx.collect::<String>());
+
+ assert_pending!(fut.poll());
+
+ tx.send("hello ".to_string()).unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ tx.send("world".to_string()).unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ drop(tx);
+ assert!(fut.is_woken());
+ let coll = assert_ready!(fut.poll());
+ assert_eq!("hello world", coll);
+}
+
+#[tokio::test]
+async fn collect_str_items() {
+ let (tx, rx) = mpsc::unbounded_channel_stream();
+
+ let mut fut = task::spawn(rx.collect::<String>());
+
+ assert_pending!(fut.poll());
+
+ tx.send("hello ").unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ tx.send("world").unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ drop(tx);
+ assert!(fut.is_woken());
+ let coll = assert_ready!(fut.poll());
+ assert_eq!("hello world", coll);
+}
+
+#[tokio::test]
+async fn collect_results_ok() {
+ let (tx, rx) = mpsc::unbounded_channel_stream();
+
+ let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
+
+ assert_pending!(fut.poll());
+
+ tx.send(Ok("hello ")).unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ tx.send(Ok("world")).unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ drop(tx);
+ assert!(fut.is_woken());
+ let coll = assert_ready_ok!(fut.poll());
+ assert_eq!("hello world", coll);
+}
+
+#[tokio::test]
+async fn collect_results_err() {
+ let (tx, rx) = mpsc::unbounded_channel_stream();
+
+ let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
+
+ assert_pending!(fut.poll());
+
+ tx.send(Ok("hello ")).unwrap();
+ assert!(fut.is_woken());
+ assert_pending!(fut.poll());
+
+ tx.send(Err("oh no")).unwrap();
+ assert!(fut.is_woken());
+ let err = assert_ready_err!(fut.poll());
+ assert_eq!("oh no", err);
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_empty.rs b/third_party/rust/tokio-stream/tests/stream_empty.rs
new file mode 100644
index 0000000000..c06f5c41c0
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_empty.rs
@@ -0,0 +1,11 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut stream = stream::empty::<i32>();
+
+ for _ in 0..2 {
+ assert_eq!(stream.size_hint(), (0, Some(0)));
+ assert_eq!(None, stream.next().await);
+ }
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_fuse.rs b/third_party/rust/tokio-stream/tests/stream_fuse.rs
new file mode 100644
index 0000000000..9b6cf054cf
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_fuse.rs
@@ -0,0 +1,50 @@
+use tokio_stream::{Stream, StreamExt};
+
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+// a stream which alternates between Some and None
+struct Alternate {
+ state: i32,
+}
+
+impl Stream for Alternate {
+ type Item = i32;
+
+ fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
+ let val = self.state;
+ self.state += 1;
+
+ // if it's even, Some(i32), else None
+ if val % 2 == 0 {
+ Poll::Ready(Some(val))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+}
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut stream = Alternate { state: 0 };
+
+ // the stream goes back and forth
+ assert_eq!(stream.next().await, Some(0));
+ assert_eq!(stream.next().await, None);
+ assert_eq!(stream.next().await, Some(2));
+ assert_eq!(stream.next().await, None);
+
+ // however, once it is fused
+ let mut stream = stream.fuse();
+
+ assert_eq!(stream.size_hint(), (0, None));
+ assert_eq!(stream.next().await, Some(4));
+
+ assert_eq!(stream.size_hint(), (0, None));
+ assert_eq!(stream.next().await, None);
+
+ // it will always return `None` after the first time.
+ assert_eq!(stream.size_hint(), (0, Some(0)));
+ assert_eq!(stream.next().await, None);
+ assert_eq!(stream.size_hint(), (0, Some(0)));
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_iter.rs b/third_party/rust/tokio-stream/tests/stream_iter.rs
new file mode 100644
index 0000000000..8b9ee3ce5b
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_iter.rs
@@ -0,0 +1,18 @@
+use tokio_stream as stream;
+use tokio_test::task;
+
+use std::iter;
+
+#[tokio::test]
+async fn coop() {
+ let mut stream = task::spawn(stream::iter(iter::repeat(1)));
+
+ for _ in 0..10_000 {
+ if stream.poll_next().is_pending() {
+ assert!(stream.is_woken());
+ return;
+ }
+ }
+
+ panic!("did not yield");
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_merge.rs b/third_party/rust/tokio-stream/tests/stream_merge.rs
new file mode 100644
index 0000000000..f603bccf88
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_merge.rs
@@ -0,0 +1,83 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+use tokio_test::task;
+use tokio_test::{assert_pending, assert_ready};
+
+mod support {
+ pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+#[tokio::test]
+async fn merge_sync_streams() {
+ let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5]));
+
+ for i in 0..7 {
+ let rem = 7 - i;
+ assert_eq!(s.size_hint(), (rem, Some(rem)));
+ assert_eq!(Some(i), s.next().await);
+ }
+
+ assert!(s.next().await.is_none());
+}
+
+#[tokio::test]
+async fn merge_async_streams() {
+ let (tx1, rx1) = mpsc::unbounded_channel_stream();
+ let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+ let mut rx = task::spawn(rx1.merge(rx2));
+
+ assert_eq!(rx.size_hint(), (0, None));
+
+ assert_pending!(rx.poll_next());
+
+ tx1.send(1).unwrap();
+
+ assert!(rx.is_woken());
+ assert_eq!(Some(1), assert_ready!(rx.poll_next()));
+
+ assert_pending!(rx.poll_next());
+ tx2.send(2).unwrap();
+
+ assert!(rx.is_woken());
+ assert_eq!(Some(2), assert_ready!(rx.poll_next()));
+ assert_pending!(rx.poll_next());
+
+ drop(tx1);
+ assert!(rx.is_woken());
+ assert_pending!(rx.poll_next());
+
+ tx2.send(3).unwrap();
+ assert!(rx.is_woken());
+ assert_eq!(Some(3), assert_ready!(rx.poll_next()));
+ assert_pending!(rx.poll_next());
+
+ drop(tx2);
+ assert!(rx.is_woken());
+ assert_eq!(None, assert_ready!(rx.poll_next()));
+}
+
+#[test]
+fn size_overflow() {
+ struct Monster;
+
+ impl tokio_stream::Stream for Monster {
+ type Item = ();
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<()>> {
+ panic!()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (usize::MAX, Some(usize::MAX))
+ }
+ }
+
+ let m1 = Monster;
+ let m2 = Monster;
+ let m = m1.merge(m2);
+ assert_eq!(m.size_hint(), (usize::MAX, None));
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_once.rs b/third_party/rust/tokio-stream/tests/stream_once.rs
new file mode 100644
index 0000000000..f32bad3a12
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_once.rs
@@ -0,0 +1,12 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut one = stream::once(1);
+
+ assert_eq!(one.size_hint(), (1, Some(1)));
+ assert_eq!(Some(1), one.next().await);
+
+ assert_eq!(one.size_hint(), (0, Some(0)));
+ assert_eq!(None, one.next().await);
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_panic.rs b/third_party/rust/tokio-stream/tests/stream_panic.rs
new file mode 100644
index 0000000000..22c1c20800
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_panic.rs
@@ -0,0 +1,55 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery
+
+use parking_lot::{const_mutex, Mutex};
+use std::error::Error;
+use std::panic;
+use std::sync::Arc;
+use tokio::time::Duration;
+use tokio_stream::{self as stream, StreamExt};
+
+fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
+ static PANIC_MUTEX: Mutex<()> = const_mutex(());
+
+ {
+ let _guard = PANIC_MUTEX.lock();
+ let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
+
+ let prev_hook = panic::take_hook();
+ {
+ let panic_file = panic_file.clone();
+ panic::set_hook(Box::new(move |panic_info| {
+ let panic_location = panic_info.location().unwrap();
+ panic_file
+ .lock()
+ .clone_from(&Some(panic_location.file().to_string()));
+ }));
+ }
+
+ let result = panic::catch_unwind(func);
+ // Return to the previously set panic hook (maybe default) so that we get nice error
+ // messages in the tests.
+ panic::set_hook(prev_hook);
+
+ if result.is_err() {
+ panic_file.lock().clone()
+ } else {
+ None
+ }
+ }
+}
+
+#[test]
+fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> {
+ let panic_location_file = test_panic(|| {
+ let iter = vec![1, 2, 3].into_iter();
+ let stream0 = stream::iter(iter);
+
+ let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2));
+ });
+
+ // The panic location should be in this file
+ assert_eq!(&panic_location_file.unwrap(), file!());
+
+ Ok(())
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_pending.rs b/third_party/rust/tokio-stream/tests/stream_pending.rs
new file mode 100644
index 0000000000..87b5d03bda
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_pending.rs
@@ -0,0 +1,14 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+use tokio_test::{assert_pending, task};
+
+#[tokio::test]
+async fn basic_usage() {
+ let mut stream = stream::pending::<i32>();
+
+ for _ in 0..2 {
+ assert_eq!(stream.size_hint(), (0, None));
+
+ let mut next = task::spawn(async { stream.next().await });
+ assert_pending!(next.poll());
+ }
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_stream_map.rs b/third_party/rust/tokio-stream/tests/stream_stream_map.rs
new file mode 100644
index 0000000000..ffc489b32e
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_stream_map.rs
@@ -0,0 +1,387 @@
+use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
+use tokio_test::{assert_ok, assert_pending, assert_ready, task};
+
+mod support {
+ pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+use std::pin::Pin;
+
+macro_rules! assert_ready_some {
+ ($($t:tt)*) => {
+ match assert_ready!($($t)*) {
+ Some(v) => v,
+ None => panic!("expected `Some`, got `None`"),
+ }
+ };
+}
+
+macro_rules! assert_ready_none {
+ ($($t:tt)*) => {
+ match assert_ready!($($t)*) {
+ None => {}
+ Some(v) => panic!("expected `None`, got `Some({:?})`", v),
+ }
+ };
+}
+
+#[tokio::test]
+async fn empty() {
+ let mut map = StreamMap::<&str, stream::Pending<()>>::new();
+
+ assert_eq!(map.len(), 0);
+ assert!(map.is_empty());
+
+ assert!(map.next().await.is_none());
+ assert!(map.next().await.is_none());
+
+ assert!(map.remove("foo").is_none());
+}
+
+#[tokio::test]
+async fn single_entry() {
+ let mut map = task::spawn(StreamMap::new());
+ let (tx, rx) = mpsc::unbounded_channel_stream();
+ let rx = Box::pin(rx);
+
+ assert_ready_none!(map.poll_next());
+
+ assert!(map.insert("foo", rx).is_none());
+ assert!(map.contains_key("foo"));
+ assert!(!map.contains_key("bar"));
+
+ assert_eq!(map.len(), 1);
+ assert!(!map.is_empty());
+
+ assert_pending!(map.poll_next());
+
+ assert_ok!(tx.send(1));
+
+ assert!(map.is_woken());
+ let (k, v) = assert_ready_some!(map.poll_next());
+ assert_eq!(k, "foo");
+ assert_eq!(v, 1);
+
+ assert_pending!(map.poll_next());
+
+ assert_ok!(tx.send(2));
+
+ assert!(map.is_woken());
+ let (k, v) = assert_ready_some!(map.poll_next());
+ assert_eq!(k, "foo");
+ assert_eq!(v, 2);
+
+ assert_pending!(map.poll_next());
+ drop(tx);
+ assert!(map.is_woken());
+ assert_ready_none!(map.poll_next());
+}
+
+#[tokio::test]
+async fn multiple_entries() {
+ let mut map = task::spawn(StreamMap::new());
+ let (tx1, rx1) = mpsc::unbounded_channel_stream();
+ let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+ let rx1 = Box::pin(rx1);
+ let rx2 = Box::pin(rx2);
+
+ map.insert("foo", rx1);
+ map.insert("bar", rx2);
+
+ assert_pending!(map.poll_next());
+
+ assert_ok!(tx1.send(1));
+
+ assert!(map.is_woken());
+ let (k, v) = assert_ready_some!(map.poll_next());
+ assert_eq!(k, "foo");
+ assert_eq!(v, 1);
+
+ assert_pending!(map.poll_next());
+
+ assert_ok!(tx2.send(2));
+
+ assert!(map.is_woken());
+ let (k, v) = assert_ready_some!(map.poll_next());
+ assert_eq!(k, "bar");
+ assert_eq!(v, 2);
+
+ assert_pending!(map.poll_next());
+
+ assert_ok!(tx1.send(3));
+ assert_ok!(tx2.send(4));
+
+ assert!(map.is_woken());
+
+ // Given the randomization, there is no guarantee what order the values will
+ // be received in.
+ let mut v = (0..2)
+ .map(|_| assert_ready_some!(map.poll_next()))
+ .collect::<Vec<_>>();
+
+ assert_pending!(map.poll_next());
+
+ v.sort_unstable();
+ assert_eq!(v[0].0, "bar");
+ assert_eq!(v[0].1, 4);
+ assert_eq!(v[1].0, "foo");
+ assert_eq!(v[1].1, 3);
+
+ drop(tx1);
+ assert!(map.is_woken());
+ assert_pending!(map.poll_next());
+ drop(tx2);
+
+ assert_ready_none!(map.poll_next());
+}
+
+#[tokio::test]
+async fn insert_remove() {
+ let mut map = task::spawn(StreamMap::new());
+ let (tx, rx) = mpsc::unbounded_channel_stream();
+
+ let rx = Box::pin(rx);
+
+ assert_ready_none!(map.poll_next());
+
+ assert!(map.insert("foo", rx).is_none());
+ let rx = map.remove("foo").unwrap();
+
+ assert_ok!(tx.send(1));
+
+ assert!(!map.is_woken());
+ assert_ready_none!(map.poll_next());
+
+ assert!(map.insert("bar", rx).is_none());
+
+ let v = assert_ready_some!(map.poll_next());
+ assert_eq!(v.0, "bar");
+ assert_eq!(v.1, 1);
+
+ assert!(map.remove("bar").is_some());
+ assert_ready_none!(map.poll_next());
+
+ assert!(map.is_empty());
+ assert_eq!(0, map.len());
+}
+
+#[tokio::test]
+async fn replace() {
+ let mut map = task::spawn(StreamMap::new());
+ let (tx1, rx1) = mpsc::unbounded_channel_stream();
+ let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+ let rx1 = Box::pin(rx1);
+ let rx2 = Box::pin(rx2);
+
+ assert!(map.insert("foo", rx1).is_none());
+
+ assert_pending!(map.poll_next());
+
+ let _rx1 = map.insert("foo", rx2).unwrap();
+
+ assert_pending!(map.poll_next());
+
+ tx1.send(1).unwrap();
+ assert_pending!(map.poll_next());
+
+ tx2.send(2).unwrap();
+ assert!(map.is_woken());
+ let v = assert_ready_some!(map.poll_next());
+ assert_eq!(v.0, "foo");
+ assert_eq!(v.1, 2);
+}
+
+#[test]
+fn size_hint_with_upper() {
+ let mut map = StreamMap::new();
+
+ map.insert("a", stream::iter(vec![1]));
+ map.insert("b", stream::iter(vec![1, 2]));
+ map.insert("c", stream::iter(vec![1, 2, 3]));
+
+ assert_eq!(3, map.len());
+ assert!(!map.is_empty());
+
+ let size_hint = map.size_hint();
+ assert_eq!(size_hint, (6, Some(6)));
+}
+
+#[test]
+fn size_hint_without_upper() {
+ let mut map = StreamMap::new();
+
+ map.insert("a", pin_box(stream::iter(vec![1])));
+ map.insert("b", pin_box(stream::iter(vec![1, 2])));
+ map.insert("c", pin_box(pending()));
+
+ let size_hint = map.size_hint();
+ assert_eq!(size_hint, (3, None));
+}
+
+#[test]
+fn new_capacity_zero() {
+ let map = StreamMap::<&str, stream::Pending<()>>::new();
+ assert_eq!(0, map.capacity());
+
+ assert!(map.keys().next().is_none());
+}
+
+#[test]
+fn with_capacity() {
+ let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
+ assert!(10 <= map.capacity());
+
+ assert!(map.keys().next().is_none());
+}
+
+#[test]
+fn iter_keys() {
+ let mut map = StreamMap::new();
+
+ map.insert("a", pending::<i32>());
+ map.insert("b", pending());
+ map.insert("c", pending());
+
+ let mut keys = map.keys().collect::<Vec<_>>();
+ keys.sort_unstable();
+
+ assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
+}
+
+#[test]
+fn iter_values() {
+ let mut map = StreamMap::new();
+
+ map.insert("a", stream::iter(vec![1]));
+ map.insert("b", stream::iter(vec![1, 2]));
+ map.insert("c", stream::iter(vec![1, 2, 3]));
+
+ let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
+
+ size_hints.sort_unstable();
+
+ assert_eq!(&size_hints[..], &[1, 2, 3]);
+}
+
+#[test]
+fn iter_values_mut() {
+ let mut map = StreamMap::new();
+
+ map.insert("a", stream::iter(vec![1]));
+ map.insert("b", stream::iter(vec![1, 2]));
+ map.insert("c", stream::iter(vec![1, 2, 3]));
+
+ let mut size_hints = map
+ .values_mut()
+ .map(|s: &mut _| s.size_hint().0)
+ .collect::<Vec<_>>();
+
+ size_hints.sort_unstable();
+
+ assert_eq!(&size_hints[..], &[1, 2, 3]);
+}
+
+#[test]
+fn clear() {
+ let mut map = task::spawn(StreamMap::new());
+
+ map.insert("a", stream::iter(vec![1]));
+ map.insert("b", stream::iter(vec![1, 2]));
+ map.insert("c", stream::iter(vec![1, 2, 3]));
+
+ assert_ready_some!(map.poll_next());
+
+ map.clear();
+
+ assert_ready_none!(map.poll_next());
+ assert!(map.is_empty());
+}
+
+#[test]
+fn contains_key_borrow() {
+ let mut map = StreamMap::new();
+ map.insert("foo".to_string(), pending::<()>());
+
+ assert!(map.contains_key("foo"));
+}
+
+#[test]
+fn one_ready_many_none() {
+ // Run a few times because of randomness
+ for _ in 0..100 {
+ let mut map = task::spawn(StreamMap::new());
+
+ map.insert(0, pin_box(stream::empty()));
+ map.insert(1, pin_box(stream::empty()));
+ map.insert(2, pin_box(stream::once("hello")));
+ map.insert(3, pin_box(stream::pending()));
+
+ let v = assert_ready_some!(map.poll_next());
+ assert_eq!(v, (2, "hello"));
+ }
+}
+
+#[cfg(not(target_os = "wasi"))]
+proptest::proptest! {
+ #[test]
+ fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
+ use std::task::{Context, Poll};
+
+ struct DidPoll<T> {
+ did_poll: bool,
+ inner: T,
+ }
+
+ impl<T: Stream + Unpin> Stream for DidPoll<T> {
+ type Item = T::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+ -> Poll<Option<T::Item>>
+ {
+ self.did_poll = true;
+ Pin::new(&mut self.inner).poll_next(cx)
+ }
+ }
+
+ for _ in 0..10 {
+ let mut map = task::spawn(StreamMap::new());
+ let mut expect = 0;
+
+ for (i, &is_empty) in kinds.iter().enumerate() {
+ let inner = if is_empty {
+ pin_box(stream::empty::<()>())
+ } else {
+ expect += 1;
+ pin_box(stream::pending::<()>())
+ };
+
+ let stream = DidPoll {
+ did_poll: false,
+ inner,
+ };
+
+ map.insert(i, stream);
+ }
+
+ if expect == 0 {
+ assert_ready_none!(map.poll_next());
+ } else {
+ assert_pending!(map.poll_next());
+
+ assert_eq!(expect, map.values().count());
+
+ for stream in map.values() {
+ assert!(stream.did_poll);
+ }
+ }
+ }
+ }
+}
+
+fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
+ Box::pin(s)
+}
diff --git a/third_party/rust/tokio-stream/tests/stream_timeout.rs b/third_party/rust/tokio-stream/tests/stream_timeout.rs
new file mode 100644
index 0000000000..2338f83358
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/stream_timeout.rs
@@ -0,0 +1,109 @@
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
+
+use tokio::time::{self, sleep, Duration};
+use tokio_stream::{self, StreamExt};
+use tokio_test::*;
+
+use futures::stream;
+
+async fn maybe_sleep(idx: i32) -> i32 {
+ if idx % 2 == 0 {
+ sleep(ms(200)).await;
+ }
+ idx
+}
+
+fn ms(n: u64) -> Duration {
+ Duration::from_millis(n)
+}
+
+#[tokio::test]
+async fn basic_usage() {
+ time::pause();
+
+ // Items 2 and 4 time out. If we run the stream until it completes,
+ // we end up with the following items:
+ //
+ // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)]
+
+ let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100));
+ let mut stream = task::spawn(stream);
+
+ // First item completes immediately
+ assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
+
+ // Second item is delayed 200ms, times out after 100ms
+ assert_pending!(stream.poll_next());
+
+ time::advance(ms(150)).await;
+ let v = assert_ready!(stream.poll_next());
+ assert!(v.unwrap().is_err());
+
+ assert_pending!(stream.poll_next());
+
+ time::advance(ms(100)).await;
+ assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
+
+ // Third item is ready immediately
+ assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
+
+ // Fourth item is delayed 200ms, times out after 100ms
+ assert_pending!(stream.poll_next());
+
+ time::advance(ms(60)).await;
+ assert_pending!(stream.poll_next()); // nothing ready yet
+
+ time::advance(ms(60)).await;
+ let v = assert_ready!(stream.poll_next());
+ assert!(v.unwrap().is_err()); // timeout!
+
+ time::advance(ms(120)).await;
+ assert_ready_eq!(stream.poll_next(), Some(Ok(4)));
+
+ // Done.
+ assert_ready_eq!(stream.poll_next(), None);
+}
+
+#[tokio::test]
+async fn return_elapsed_errors_only_once() {
+ time::pause();
+
+ let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50));
+ let mut stream = task::spawn(stream);
+
+ // First item completes immediately
+ assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
+
+ // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed`
+ // error is returned.
+ assert_pending!(stream.poll_next());
+ //
+ time::advance(ms(51)).await;
+ let v = assert_ready!(stream.poll_next());
+ assert!(v.unwrap().is_err()); // timeout!
+
+ // deadline elapses again, but no error is returned
+ time::advance(ms(50)).await;
+ assert_pending!(stream.poll_next());
+
+ time::advance(ms(100)).await;
+ assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
+ assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
+
+ // Done
+ assert_ready_eq!(stream.poll_next(), None);
+}
+
+#[tokio::test]
+async fn no_timeouts() {
+ let stream = stream::iter(vec![1, 3, 5])
+ .then(maybe_sleep)
+ .timeout(ms(100));
+
+ let mut stream = task::spawn(stream);
+
+ assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
+ assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
+ assert_ready_eq!(stream.poll_next(), Some(Ok(5)));
+ assert_ready_eq!(stream.poll_next(), None);
+}
diff --git a/third_party/rust/tokio-stream/tests/support/mpsc.rs b/third_party/rust/tokio-stream/tests/support/mpsc.rs
new file mode 100644
index 0000000000..09dbe04215
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/support/mpsc.rs
@@ -0,0 +1,15 @@
+use async_stream::stream;
+use tokio::sync::mpsc::{self, UnboundedSender};
+use tokio_stream::Stream;
+
+pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ let stream = stream! {
+ while let Some(item) = rx.recv().await {
+ yield item;
+ }
+ };
+
+ (tx, stream)
+}
diff --git a/third_party/rust/tokio-stream/tests/time_throttle.rs b/third_party/rust/tokio-stream/tests/time_throttle.rs
new file mode 100644
index 0000000000..e6c9917be3
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/time_throttle.rs
@@ -0,0 +1,28 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
+
+use tokio::time;
+use tokio_stream::StreamExt;
+use tokio_test::*;
+
+use std::time::Duration;
+
+#[tokio::test]
+async fn usage() {
+ time::pause();
+
+ let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100)));
+
+ assert_ready!(stream.poll_next());
+ assert_pending!(stream.poll_next());
+
+ time::advance(Duration::from_millis(90)).await;
+
+ assert_pending!(stream.poll_next());
+
+ time::advance(Duration::from_millis(101)).await;
+
+ assert!(stream.is_woken());
+
+ assert_ready!(stream.poll_next());
+}
diff --git a/third_party/rust/tokio-stream/tests/watch.rs b/third_party/rust/tokio-stream/tests/watch.rs
new file mode 100644
index 0000000000..a56254edef
--- /dev/null
+++ b/third_party/rust/tokio-stream/tests/watch.rs
@@ -0,0 +1,29 @@
+#![cfg(feature = "sync")]
+
+use tokio::sync::watch;
+use tokio_stream::wrappers::WatchStream;
+use tokio_stream::StreamExt;
+
+#[tokio::test]
+async fn message_not_twice() {
+ let (tx, rx) = watch::channel("hello");
+
+ let mut counter = 0;
+ let mut stream = WatchStream::new(rx).map(move |payload| {
+ println!("{}", payload);
+ if payload == "goodbye" {
+ counter += 1;
+ }
+ if counter >= 2 {
+ panic!("too many goodbyes");
+ }
+ });
+
+ let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
+
+ // Send goodbye just once
+ tx.send("goodbye").unwrap();
+
+ drop(tx);
+ task.await.unwrap();
+}