diff options
Diffstat (limited to 'third_party/rust/tokio-stream/tests')
16 files changed, 1248 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/tests/async_send_sync.rs b/third_party/rust/tokio-stream/tests/async_send_sync.rs new file mode 100644 index 0000000000..f1c8b4efe2 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/async_send_sync.rs @@ -0,0 +1,107 @@ +#![allow(clippy::diverging_sub_expression)] + +use std::rc::Rc; + +#[allow(dead_code)] +type BoxStream<T> = std::pin::Pin<Box<dyn tokio_stream::Stream<Item = T>>>; + +#[allow(dead_code)] +fn require_send<T: Send>(_t: &T) {} +#[allow(dead_code)] +fn require_sync<T: Sync>(_t: &T) {} +#[allow(dead_code)] +fn require_unpin<T: Unpin>(_t: &T) {} + +#[allow(dead_code)] +struct Invalid; + +trait AmbiguousIfSend<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfSend<()> for T {} +impl<T: ?Sized + Send> AmbiguousIfSend<Invalid> for T {} + +trait AmbiguousIfSync<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfSync<()> for T {} +impl<T: ?Sized + Sync> AmbiguousIfSync<Invalid> for T {} + +trait AmbiguousIfUnpin<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfUnpin<()> for T {} +impl<T: ?Sized + Unpin> AmbiguousIfUnpin<Invalid> for T {} + +macro_rules! into_todo { + ($typ:ty) => {{ + let x: $typ = todo!(); + x + }}; +} + +macro_rules! async_assert_fn { + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfUnpin::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_unpin(&f); + }; + }; +} + +async_assert_fn!(tokio_stream::empty<Rc<u8>>(): Send & Sync); +async_assert_fn!(tokio_stream::pending<Rc<u8>>(): Send & Sync); +async_assert_fn!(tokio_stream::iter(std::vec::IntoIter<u8>): Send & Sync); + +async_assert_fn!(tokio_stream::StreamExt::next(&mut BoxStream<()>): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::try_next(&mut BoxStream<Result<(), ()>>): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::collect<Vec<()>>(&mut BoxStream<()>): !Unpin); diff --git a/third_party/rust/tokio-stream/tests/chunks_timeout.rs b/third_party/rust/tokio-stream/tests/chunks_timeout.rs new file mode 100644 index 0000000000..ffc7deadd7 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/chunks_timeout.rs @@ -0,0 +1,84 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time; +use tokio_stream::{self as stream, StreamExt}; +use tokio_test::assert_pending; +use tokio_test::task; + +use futures::FutureExt; +use std::time::Duration; + +#[tokio::test(start_paused = true)] +async fn usage() { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![4].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(4, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test(start_paused = true)] +async fn full_chunk_with_timeout() { + let iter = vec![1, 2].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![3].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n)); + + let iter = vec![4].into_iter(); + let stream2 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chain(stream2) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + + assert_pending!(chunk_stream.poll_next()); + time::advance(Duration::from_secs(2)).await; + assert_eq!(chunk_stream.next().await, Some(vec![4])); +} + +#[tokio::test] +#[ignore] +async fn real_time() { + let iter = vec![1, 2, 3, 4].into_iter(); + let stream0 = stream::iter(iter); + + let iter = vec![5].into_iter(); + let stream1 = + stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); + + let chunk_stream = stream0 + .chain(stream1) + .chunks_timeout(3, Duration::from_secs(2)); + + let mut chunk_stream = task::spawn(chunk_stream); + + assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); + assert_eq!(chunk_stream.next().await, Some(vec![4])); + assert_eq!(chunk_stream.next().await, Some(vec![5])); +} diff --git a/third_party/rust/tokio-stream/tests/stream_chain.rs b/third_party/rust/tokio-stream/tests/stream_chain.rs new file mode 100644 index 0000000000..f3b7edb16a --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_chain.rs @@ -0,0 +1,100 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; +use tokio_test::{assert_pending, assert_ready, task}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +#[tokio::test] +async fn basic_usage() { + let one = stream::iter(vec![1, 2, 3]); + let two = stream::iter(vec![4, 5, 6]); + + let mut stream = one.chain(two); + + assert_eq!(stream.size_hint(), (6, Some(6))); + assert_eq!(stream.next().await, Some(1)); + + assert_eq!(stream.size_hint(), (5, Some(5))); + assert_eq!(stream.next().await, Some(2)); + + assert_eq!(stream.size_hint(), (4, Some(4))); + assert_eq!(stream.next().await, Some(3)); + + assert_eq!(stream.size_hint(), (3, Some(3))); + assert_eq!(stream.next().await, Some(4)); + + assert_eq!(stream.size_hint(), (2, Some(2))); + assert_eq!(stream.next().await, Some(5)); + + assert_eq!(stream.size_hint(), (1, Some(1))); + assert_eq!(stream.next().await, Some(6)); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); +} + +#[tokio::test] +async fn pending_first() { + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let mut stream = task::spawn(rx1.chain(rx2)); + assert_eq!(stream.size_hint(), (0, None)); + + assert_pending!(stream.poll_next()); + + tx2.send(2).unwrap(); + assert!(!stream.is_woken()); + + assert_pending!(stream.poll_next()); + + tx1.send(1).unwrap(); + assert!(stream.is_woken()); + assert_eq!(Some(1), assert_ready!(stream.poll_next())); + + assert_pending!(stream.poll_next()); + + drop(tx1); + + assert_eq!(stream.size_hint(), (0, None)); + + assert!(stream.is_woken()); + assert_eq!(Some(2), assert_ready!(stream.poll_next())); + + assert_eq!(stream.size_hint(), (0, None)); + + drop(tx2); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(None, assert_ready!(stream.poll_next())); +} + +#[test] +fn size_overflow() { + struct Monster; + + impl tokio_stream::Stream for Monster { + type Item = (); + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<()>> { + panic!() + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (usize::MAX, Some(usize::MAX)) + } + } + + let m1 = Monster; + let m2 = Monster; + let m = m1.chain(m2); + assert_eq!(m.size_hint(), (usize::MAX, None)); +} diff --git a/third_party/rust/tokio-stream/tests/stream_collect.rs b/third_party/rust/tokio-stream/tests/stream_collect.rs new file mode 100644 index 0000000000..07659a1fc3 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_collect.rs @@ -0,0 +1,146 @@ +use tokio_stream::{self as stream, StreamExt}; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +#[allow(clippy::let_unit_value)] +#[tokio::test] +async fn empty_unit() { + // Drains the stream. + let mut iter = vec![(), (), ()].into_iter(); + let _: () = stream::iter(&mut iter).collect().await; + assert!(iter.next().is_none()); +} + +#[tokio::test] +async fn empty_vec() { + let coll: Vec<u32> = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_box_slice() { + let coll: Box<[u32]> = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_string() { + let coll: String = stream::empty::<&str>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_result() { + let coll: Result<Vec<u32>, &str> = stream::empty().collect().await; + assert_eq!(Ok(vec![]), coll); +} + +#[tokio::test] +async fn collect_vec_items() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + let mut fut = task::spawn(rx.collect::<Vec<i32>>()); + + assert_pending!(fut.poll()); + + tx.send(1).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(2).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!(vec![1, 2], coll); +} + +#[tokio::test] +async fn collect_string_items() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<String>()); + + assert_pending!(fut.poll()); + + tx.send("hello ".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_str_items() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<String>()); + + assert_pending!(fut.poll()); + + tx.send("hello ").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_results_ok() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Ok("world")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready_ok!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_results_err() { + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Err("oh no")).unwrap(); + assert!(fut.is_woken()); + let err = assert_ready_err!(fut.poll()); + assert_eq!("oh no", err); +} diff --git a/third_party/rust/tokio-stream/tests/stream_empty.rs b/third_party/rust/tokio-stream/tests/stream_empty.rs new file mode 100644 index 0000000000..c06f5c41c0 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_empty.rs @@ -0,0 +1,11 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut stream = stream::empty::<i32>(); + + for _ in 0..2 { + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(None, stream.next().await); + } +} diff --git a/third_party/rust/tokio-stream/tests/stream_fuse.rs b/third_party/rust/tokio-stream/tests/stream_fuse.rs new file mode 100644 index 0000000000..9b6cf054cf --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_fuse.rs @@ -0,0 +1,50 @@ +use tokio_stream::{Stream, StreamExt}; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +// a stream which alternates between Some and None +struct Alternate { + state: i32, +} + +impl Stream for Alternate { + type Item = i32; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + let val = self.state; + self.state += 1; + + // if it's even, Some(i32), else None + if val % 2 == 0 { + Poll::Ready(Some(val)) + } else { + Poll::Ready(None) + } + } +} + +#[tokio::test] +async fn basic_usage() { + let mut stream = Alternate { state: 0 }; + + // the stream goes back and forth + assert_eq!(stream.next().await, Some(0)); + assert_eq!(stream.next().await, None); + assert_eq!(stream.next().await, Some(2)); + assert_eq!(stream.next().await, None); + + // however, once it is fused + let mut stream = stream.fuse(); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(stream.next().await, Some(4)); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(stream.next().await, None); + + // it will always return `None` after the first time. + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); + assert_eq!(stream.size_hint(), (0, Some(0))); +} diff --git a/third_party/rust/tokio-stream/tests/stream_iter.rs b/third_party/rust/tokio-stream/tests/stream_iter.rs new file mode 100644 index 0000000000..8b9ee3ce5b --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_iter.rs @@ -0,0 +1,18 @@ +use tokio_stream as stream; +use tokio_test::task; + +use std::iter; + +#[tokio::test] +async fn coop() { + let mut stream = task::spawn(stream::iter(iter::repeat(1))); + + for _ in 0..10_000 { + if stream.poll_next().is_pending() { + assert!(stream.is_woken()); + return; + } + } + + panic!("did not yield"); +} diff --git a/third_party/rust/tokio-stream/tests/stream_merge.rs b/third_party/rust/tokio-stream/tests/stream_merge.rs new file mode 100644 index 0000000000..f603bccf88 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_merge.rs @@ -0,0 +1,83 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; +use tokio_test::task; +use tokio_test::{assert_pending, assert_ready}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +#[tokio::test] +async fn merge_sync_streams() { + let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5])); + + for i in 0..7 { + let rem = 7 - i; + assert_eq!(s.size_hint(), (rem, Some(rem))); + assert_eq!(Some(i), s.next().await); + } + + assert!(s.next().await.is_none()); +} + +#[tokio::test] +async fn merge_async_streams() { + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let mut rx = task::spawn(rx1.merge(rx2)); + + assert_eq!(rx.size_hint(), (0, None)); + + assert_pending!(rx.poll_next()); + + tx1.send(1).unwrap(); + + assert!(rx.is_woken()); + assert_eq!(Some(1), assert_ready!(rx.poll_next())); + + assert_pending!(rx.poll_next()); + tx2.send(2).unwrap(); + + assert!(rx.is_woken()); + assert_eq!(Some(2), assert_ready!(rx.poll_next())); + assert_pending!(rx.poll_next()); + + drop(tx1); + assert!(rx.is_woken()); + assert_pending!(rx.poll_next()); + + tx2.send(3).unwrap(); + assert!(rx.is_woken()); + assert_eq!(Some(3), assert_ready!(rx.poll_next())); + assert_pending!(rx.poll_next()); + + drop(tx2); + assert!(rx.is_woken()); + assert_eq!(None, assert_ready!(rx.poll_next())); +} + +#[test] +fn size_overflow() { + struct Monster; + + impl tokio_stream::Stream for Monster { + type Item = (); + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<()>> { + panic!() + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (usize::MAX, Some(usize::MAX)) + } + } + + let m1 = Monster; + let m2 = Monster; + let m = m1.merge(m2); + assert_eq!(m.size_hint(), (usize::MAX, None)); +} diff --git a/third_party/rust/tokio-stream/tests/stream_once.rs b/third_party/rust/tokio-stream/tests/stream_once.rs new file mode 100644 index 0000000000..f32bad3a12 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_once.rs @@ -0,0 +1,12 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut one = stream::once(1); + + assert_eq!(one.size_hint(), (1, Some(1))); + assert_eq!(Some(1), one.next().await); + + assert_eq!(one.size_hint(), (0, Some(0))); + assert_eq!(None, one.next().await); +} diff --git a/third_party/rust/tokio-stream/tests/stream_panic.rs b/third_party/rust/tokio-stream/tests/stream_panic.rs new file mode 100644 index 0000000000..22c1c20800 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_panic.rs @@ -0,0 +1,55 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery + +use parking_lot::{const_mutex, Mutex}; +use std::error::Error; +use std::panic; +use std::sync::Arc; +use tokio::time::Duration; +use tokio_stream::{self as stream, StreamExt}; + +fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> { + static PANIC_MUTEX: Mutex<()> = const_mutex(()); + + { + let _guard = PANIC_MUTEX.lock(); + let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); + + let prev_hook = panic::take_hook(); + { + let panic_file = panic_file.clone(); + panic::set_hook(Box::new(move |panic_info| { + let panic_location = panic_info.location().unwrap(); + panic_file + .lock() + .clone_from(&Some(panic_location.file().to_string())); + })); + } + + let result = panic::catch_unwind(func); + // Return to the previously set panic hook (maybe default) so that we get nice error + // messages in the tests. + panic::set_hook(prev_hook); + + if result.is_err() { + panic_file.lock().clone() + } else { + None + } + } +} + +#[test] +fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> { + let panic_location_file = test_panic(|| { + let iter = vec![1, 2, 3].into_iter(); + let stream0 = stream::iter(iter); + + let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2)); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} diff --git a/third_party/rust/tokio-stream/tests/stream_pending.rs b/third_party/rust/tokio-stream/tests/stream_pending.rs new file mode 100644 index 0000000000..87b5d03bda --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_pending.rs @@ -0,0 +1,14 @@ +use tokio_stream::{self as stream, Stream, StreamExt}; +use tokio_test::{assert_pending, task}; + +#[tokio::test] +async fn basic_usage() { + let mut stream = stream::pending::<i32>(); + + for _ in 0..2 { + assert_eq!(stream.size_hint(), (0, None)); + + let mut next = task::spawn(async { stream.next().await }); + assert_pending!(next.poll()); + } +} diff --git a/third_party/rust/tokio-stream/tests/stream_stream_map.rs b/third_party/rust/tokio-stream/tests/stream_stream_map.rs new file mode 100644 index 0000000000..ffc489b32e --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_stream_map.rs @@ -0,0 +1,387 @@ +use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap}; +use tokio_test::{assert_ok, assert_pending, assert_ready, task}; + +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + +use std::pin::Pin; + +macro_rules! assert_ready_some { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + Some(v) => v, + None => panic!("expected `Some`, got `None`"), + } + }; +} + +macro_rules! assert_ready_none { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + None => {} + Some(v) => panic!("expected `None`, got `Some({:?})`", v), + } + }; +} + +#[tokio::test] +async fn empty() { + let mut map = StreamMap::<&str, stream::Pending<()>>::new(); + + assert_eq!(map.len(), 0); + assert!(map.is_empty()); + + assert!(map.next().await.is_none()); + assert!(map.next().await.is_none()); + + assert!(map.remove("foo").is_none()); +} + +#[tokio::test] +async fn single_entry() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel_stream(); + let rx = Box::pin(rx); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + assert!(map.contains_key("foo")); + assert!(!map.contains_key("bar")); + + assert_eq!(map.len(), 1); + assert!(!map.is_empty()); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + drop(tx); + assert!(map.is_woken()); + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn multiple_entries() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let rx1 = Box::pin(rx1); + let rx2 = Box::pin(rx2); + + map.insert("foo", rx1); + map.insert("bar", rx2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx2.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "bar"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(3)); + assert_ok!(tx2.send(4)); + + assert!(map.is_woken()); + + // Given the randomization, there is no guarantee what order the values will + // be received in. + let mut v = (0..2) + .map(|_| assert_ready_some!(map.poll_next())) + .collect::<Vec<_>>(); + + assert_pending!(map.poll_next()); + + v.sort_unstable(); + assert_eq!(v[0].0, "bar"); + assert_eq!(v[0].1, 4); + assert_eq!(v[1].0, "foo"); + assert_eq!(v[1].1, 3); + + drop(tx1); + assert!(map.is_woken()); + assert_pending!(map.poll_next()); + drop(tx2); + + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn insert_remove() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let rx = Box::pin(rx); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + let rx = map.remove("foo").unwrap(); + + assert_ok!(tx.send(1)); + + assert!(!map.is_woken()); + assert_ready_none!(map.poll_next()); + + assert!(map.insert("bar", rx).is_none()); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "bar"); + assert_eq!(v.1, 1); + + assert!(map.remove("bar").is_some()); + assert_ready_none!(map.poll_next()); + + assert!(map.is_empty()); + assert_eq!(0, map.len()); +} + +#[tokio::test] +async fn replace() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let rx1 = Box::pin(rx1); + let rx2 = Box::pin(rx2); + + assert!(map.insert("foo", rx1).is_none()); + + assert_pending!(map.poll_next()); + + let _rx1 = map.insert("foo", rx2).unwrap(); + + assert_pending!(map.poll_next()); + + tx1.send(1).unwrap(); + assert_pending!(map.poll_next()); + + tx2.send(2).unwrap(); + assert!(map.is_woken()); + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "foo"); + assert_eq!(v.1, 2); +} + +#[test] +fn size_hint_with_upper() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_eq!(3, map.len()); + assert!(!map.is_empty()); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (6, Some(6))); +} + +#[test] +fn size_hint_without_upper() { + let mut map = StreamMap::new(); + + map.insert("a", pin_box(stream::iter(vec![1]))); + map.insert("b", pin_box(stream::iter(vec![1, 2]))); + map.insert("c", pin_box(pending())); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (3, None)); +} + +#[test] +fn new_capacity_zero() { + let map = StreamMap::<&str, stream::Pending<()>>::new(); + assert_eq!(0, map.capacity()); + + assert!(map.keys().next().is_none()); +} + +#[test] +fn with_capacity() { + let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10); + assert!(10 <= map.capacity()); + + assert!(map.keys().next().is_none()); +} + +#[test] +fn iter_keys() { + let mut map = StreamMap::new(); + + map.insert("a", pending::<i32>()); + map.insert("b", pending()); + map.insert("c", pending()); + + let mut keys = map.keys().collect::<Vec<_>>(); + keys.sort_unstable(); + + assert_eq!(&keys[..], &[&"a", &"b", &"c"]); +} + +#[test] +fn iter_values() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>(); + + size_hints.sort_unstable(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn iter_values_mut() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map + .values_mut() + .map(|s: &mut _| s.size_hint().0) + .collect::<Vec<_>>(); + + size_hints.sort_unstable(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn clear() { + let mut map = task::spawn(StreamMap::new()); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_ready_some!(map.poll_next()); + + map.clear(); + + assert_ready_none!(map.poll_next()); + assert!(map.is_empty()); +} + +#[test] +fn contains_key_borrow() { + let mut map = StreamMap::new(); + map.insert("foo".to_string(), pending::<()>()); + + assert!(map.contains_key("foo")); +} + +#[test] +fn one_ready_many_none() { + // Run a few times because of randomness + for _ in 0..100 { + let mut map = task::spawn(StreamMap::new()); + + map.insert(0, pin_box(stream::empty())); + map.insert(1, pin_box(stream::empty())); + map.insert(2, pin_box(stream::once("hello"))); + map.insert(3, pin_box(stream::pending())); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v, (2, "hello")); + } +} + +#[cfg(not(target_os = "wasi"))] +proptest::proptest! { + #[test] + fn fuzz_pending_complete_mix(kinds: Vec<bool>) { + use std::task::{Context, Poll}; + + struct DidPoll<T> { + did_poll: bool, + inner: T, + } + + impl<T: Stream + Unpin> Stream for DidPoll<T> { + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll<Option<T::Item>> + { + self.did_poll = true; + Pin::new(&mut self.inner).poll_next(cx) + } + } + + for _ in 0..10 { + let mut map = task::spawn(StreamMap::new()); + let mut expect = 0; + + for (i, &is_empty) in kinds.iter().enumerate() { + let inner = if is_empty { + pin_box(stream::empty::<()>()) + } else { + expect += 1; + pin_box(stream::pending::<()>()) + }; + + let stream = DidPoll { + did_poll: false, + inner, + }; + + map.insert(i, stream); + } + + if expect == 0 { + assert_ready_none!(map.poll_next()); + } else { + assert_pending!(map.poll_next()); + + assert_eq!(expect, map.values().count()); + + for stream in map.values() { + assert!(stream.did_poll); + } + } + } + } +} + +fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> { + Box::pin(s) +} diff --git a/third_party/rust/tokio-stream/tests/stream_timeout.rs b/third_party/rust/tokio-stream/tests/stream_timeout.rs new file mode 100644 index 0000000000..2338f83358 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/stream_timeout.rs @@ -0,0 +1,109 @@ +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time::{self, sleep, Duration}; +use tokio_stream::{self, StreamExt}; +use tokio_test::*; + +use futures::stream; + +async fn maybe_sleep(idx: i32) -> i32 { + if idx % 2 == 0 { + sleep(ms(200)).await; + } + idx +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} + +#[tokio::test] +async fn basic_usage() { + time::pause(); + + // Items 2 and 4 time out. If we run the stream until it completes, + // we end up with the following items: + // + // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] + + let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(150)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); + + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + + // Third item is ready immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Fourth item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(60)).await; + assert_pending!(stream.poll_next()); // nothing ready yet + + time::advance(ms(60)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + time::advance(ms(120)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(4))); + + // Done. + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn return_elapsed_errors_only_once() { + time::pause(); + + let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` + // error is returned. + assert_pending!(stream.poll_next()); + // + time::advance(ms(51)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + // deadline elapses again, but no error is returned + time::advance(ms(50)).await; + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Done + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn no_timeouts() { + let stream = stream::iter(vec![1, 3, 5]) + .then(maybe_sleep) + .timeout(ms(100)); + + let mut stream = task::spawn(stream); + + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + assert_ready_eq!(stream.poll_next(), Some(Ok(5))); + assert_ready_eq!(stream.poll_next(), None); +} diff --git a/third_party/rust/tokio-stream/tests/support/mpsc.rs b/third_party/rust/tokio-stream/tests/support/mpsc.rs new file mode 100644 index 0000000000..09dbe04215 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/support/mpsc.rs @@ -0,0 +1,15 @@ +use async_stream::stream; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::Stream; + +pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let stream = stream! { + while let Some(item) = rx.recv().await { + yield item; + } + }; + + (tx, stream) +} diff --git a/third_party/rust/tokio-stream/tests/time_throttle.rs b/third_party/rust/tokio-stream/tests/time_throttle.rs new file mode 100644 index 0000000000..e6c9917be3 --- /dev/null +++ b/third_party/rust/tokio-stream/tests/time_throttle.rs @@ -0,0 +1,28 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))] + +use tokio::time; +use tokio_stream::StreamExt; +use tokio_test::*; + +use std::time::Duration; + +#[tokio::test] +async fn usage() { + time::pause(); + + let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100))); + + assert_ready!(stream.poll_next()); + assert_pending!(stream.poll_next()); + + time::advance(Duration::from_millis(90)).await; + + assert_pending!(stream.poll_next()); + + time::advance(Duration::from_millis(101)).await; + + assert!(stream.is_woken()); + + assert_ready!(stream.poll_next()); +} diff --git a/third_party/rust/tokio-stream/tests/watch.rs b/third_party/rust/tokio-stream/tests/watch.rs new file mode 100644 index 0000000000..a56254edef --- /dev/null +++ b/third_party/rust/tokio-stream/tests/watch.rs @@ -0,0 +1,29 @@ +#![cfg(feature = "sync")] + +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::StreamExt; + +#[tokio::test] +async fn message_not_twice() { + let (tx, rx) = watch::channel("hello"); + + let mut counter = 0; + let mut stream = WatchStream::new(rx).map(move |payload| { + println!("{}", payload); + if payload == "goodbye" { + counter += 1; + } + if counter >= 2 { + panic!("too many goodbyes"); + } + }); + + let task = tokio::spawn(async move { while stream.next().await.is_some() {} }); + + // Send goodbye just once + tx.send("goodbye").unwrap(); + + drop(tx); + task.await.unwrap(); +} |