From 2aa4a82499d4becd2284cdb482213d541b8804dd Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 28 Apr 2024 16:29:10 +0200 Subject: Adding upstream version 86.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/tokio/tests/stream_collect.rs | 172 +++++++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 third_party/rust/tokio/tests/stream_collect.rs (limited to 'third_party/rust/tokio/tests/stream_collect.rs') diff --git a/third_party/rust/tokio/tests/stream_collect.rs b/third_party/rust/tokio/tests/stream_collect.rs new file mode 100644 index 0000000000..70051e7f67 --- /dev/null +++ b/third_party/rust/tokio/tests/stream_collect.rs @@ -0,0 +1,172 @@ +use tokio::stream::{self, StreamExt}; +use tokio::sync::mpsc; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; + +use bytes::{Bytes, BytesMut}; + +#[allow(clippy::let_unit_value)] +#[tokio::test] +async fn empty_unit() { + // Drains the stream. + let mut iter = vec![(), (), ()].into_iter(); + let _: () = stream::iter(&mut iter).collect().await; + assert!(iter.next().is_none()); +} + +#[tokio::test] +async fn empty_vec() { + let coll: Vec = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_box_slice() { + let coll: Box<[u32]> = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_bytes() { + let coll: Bytes = stream::empty::<&[u8]>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_bytes_mut() { + let coll: BytesMut = stream::empty::<&[u8]>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_string() { + let coll: String = stream::empty::<&str>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_result() { + let coll: Result, &str> = stream::empty().collect().await; + assert_eq!(Ok(vec![]), coll); +} + +#[tokio::test] +async fn collect_vec_items() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::>()); + + assert_pending!(fut.poll()); + + tx.send(1).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(2).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!(vec![1, 2], coll); +} + +#[tokio::test] +async fn collect_string_items() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::()); + + assert_pending!(fut.poll()); + + tx.send("hello ".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_str_items() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::()); + + assert_pending!(fut.poll()); + + tx.send("hello ").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_bytes() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::()); + + assert_pending!(fut.poll()); + + tx.send(&b"hello "[..]).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(&b"world"[..]).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!(&b"hello world"[..], coll); +} + +#[tokio::test] +async fn collect_results_ok() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Ok("world")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready_ok!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_results_err() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Err("oh no")).unwrap(); + assert!(fut.is_woken()); + let err = assert_ready_err!(fut.poll()); + assert_eq!("oh no", err); +} -- cgit v1.2.3