diff options
Diffstat (limited to 'third_party/rust/tokio/tests/io_mem_stream.rs')
-rw-r--r-- | third_party/rust/tokio/tests/io_mem_stream.rs | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/io_mem_stream.rs b/third_party/rust/tokio/tests/io_mem_stream.rs new file mode 100644 index 0000000000..a2c2dadfc9 --- /dev/null +++ b/third_party/rust/tokio/tests/io_mem_stream.rs @@ -0,0 +1,121 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; + +#[tokio::test] +async fn ping_pong() { + let (mut a, mut b) = duplex(32); + + let mut buf = [0u8; 4]; + + a.write_all(b"ping").await.unwrap(); + b.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"ping"); + + b.write_all(b"pong").await.unwrap(); + a.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"pong"); +} + +#[tokio::test] +async fn across_tasks() { + let (mut a, mut b) = duplex(32); + + let t1 = tokio::spawn(async move { + a.write_all(b"ping").await.unwrap(); + let mut buf = [0u8; 4]; + a.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"pong"); + }); + + let t2 = tokio::spawn(async move { + let mut buf = [0u8; 4]; + b.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"ping"); + b.write_all(b"pong").await.unwrap(); + }); + + t1.await.unwrap(); + t2.await.unwrap(); +} + +#[tokio::test] +async fn disconnect() { + let (mut a, mut b) = duplex(32); + + let t1 = tokio::spawn(async move { + a.write_all(b"ping").await.unwrap(); + // and dropped + }); + + let t2 = tokio::spawn(async move { + let mut buf = [0u8; 32]; + let n = b.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..n], b"ping"); + + let n = b.read(&mut buf).await.unwrap(); + assert_eq!(n, 0); + }); + + t1.await.unwrap(); + t2.await.unwrap(); +} + +#[tokio::test] +async fn disconnect_reader() { + let (a, mut b) = duplex(2); + + let t1 = tokio::spawn(async move { + // this will block, as not all data fits into duplex + b.write_all(b"ping").await.unwrap_err(); + }); + + let t2 = tokio::spawn(async move { + // here we drop the reader side, and we expect the writer in the other + // task to exit with an error + drop(a); + }); + + t2.await.unwrap(); + t1.await.unwrap(); +} + +#[tokio::test] +async fn max_write_size() { + let (mut a, mut b) = duplex(32); + + let t1 = tokio::spawn(async move { + let n = a.write(&[0u8; 64]).await.unwrap(); + assert_eq!(n, 32); + let n = a.write(&[0u8; 64]).await.unwrap(); + assert_eq!(n, 4); + }); + + let mut buf = [0u8; 4]; + b.read_exact(&mut buf).await.unwrap(); + + t1.await.unwrap(); + + // drop b only after task t1 finishes writing + drop(b); +} + +#[tokio::test] +async fn duplex_is_cooperative() { + let (mut tx, mut rx) = tokio::io::duplex(1024 * 8); + + tokio::select! { + biased; + + _ = async { + loop { + let buf = [3u8; 4096]; + tx.write_all(&buf).await.unwrap(); + let mut buf = [0u8; 4096]; + rx.read(&mut buf).await.unwrap(); + } + } => {}, + _ = tokio::task::yield_now() => {} + } +} |